Skip to content

Improve ackManager.completeTask performance by two orders of magnitude#5216

Merged
tdeebswihart merged 7 commits intomainfrom
tds/improve-ack-manager
Dec 13, 2023
Merged

Improve ackManager.completeTask performance by two orders of magnitude#5216
tdeebswihart merged 7 commits intomainfrom
tds/improve-ack-manager

Conversation

@tdeebswihart
Copy link
Contributor

What changed?

I replaced the outstandingTasks map with an ordered treemap and optimized
completeTask to only scan what was necessary to update the ack level.

Why?

The old implementation of completeTask required a full scan of the task
map in order to move the ack level which had terrible performance.
By storing tasks in an ordered set we can limit the scan's size by
stopping at the first unacked task.

This trades addTask performance for completeTask performance but since all
added tasks are presumably completed we should be fine with 1/3 the
performance on addTask for 227x the completeTask performance. With this
change both operations run in about the same amount of time.

Before:

$ go test -bench=AckManager ./service/matching/... -run=FooBarBaz
goos: darwin
goarch: arm64
pkg: go.temporal.io/server/service/matching
BenchmarkAckManager_AddTask-12         	  22768	    52206 ns/op
BenchmarkAckManager_CompleteTask-12    	     38	 29293019 ns/op

After:

$ go test -bench=AckManager ./service/matching -run=FooBarBaz
goos: darwin
goarch: arm64
pkg: go.temporal.io/server/service/matching
BenchmarkAckManager_AddTask-12         	   8127	   147226 ns/op
BenchmarkAckManager_CompleteTask-12    	   8626	   136614 ns/op

How did you test it?

I added both tests and benchmarks to ensure the ackManager worked as before

Potential risks

None.

Is hotfix candidate?

No

Tests before refactors!
The old implementation of completeTask required a full scan of the task
map in order to move the ack level which had terrible performance.

By storing tasks in an ordered set we can limit the scan's size by
stopping at the first unacked task

Before:
    $ go test -bench=AckManager ./service/matching/... -run=FooBarBaz
    goos: darwin
    goarch: arm64
    pkg: go.temporal.io/server/service/matching
    BenchmarkAckManager_AddTask-12         	  22768	    52206 ns/op
    BenchmarkAckManager_CompleteTask-12    	     38	 29293019 ns/op

After:
    $ go test -bench=AckManager ./service/matching -run=FooBarBaz
    goos: darwin
    goarch: arm64
    pkg: go.temporal.io/server/service/matching
    BenchmarkAckManager_AddTask-12         	   8127	   147226 ns/op
    BenchmarkAckManager_CompleteTask-12    	   8626	   136614 ns/op
@tdeebswihart tdeebswihart requested a review from yycptt December 8, 2023 21:50
@tdeebswihart tdeebswihart requested a review from a team as a code owner December 8, 2023 21:50
@dnr
Copy link
Contributor

dnr commented Dec 8, 2023

Not to be too pedantic since it's already a huge improvement, but I'm curious how the performance would compare to using https://pkg.go.dev/github.com/google/btree, specifically the ordered generic version (NewOrderedG). That's the ordered map for Go I've used a few times in the past

@dnr
Copy link
Contributor

dnr commented Dec 8, 2023

Oh, I see, you picked that one since it's already used in the test. Well, I'm still curious. If it's faster maybe we can change both

@tdeebswihart
Copy link
Contributor Author

tdeebswihart commented Dec 8, 2023

Not to be too pedantic since it's already a huge improvement, but I'm curious how the performance would compare to using https://pkg.go.dev/github.com/google/btree, specifically the ordered generic version (NewOrderedG). That's the ordered map for Go I've used a few times in the past

Looks like this doesn't allow us to store values alongside the keys, hrm. I'll build it with this alongside a map of K->V to see how it performs

@dnr
Copy link
Contributor

dnr commented Dec 8, 2023

Looks like this doesn't allow us to store values alongside the keys, hrm. I'll build it with this alongside a map of K->V to see how it performs

Oh right, I forgot you need the bool. You can totally store values next to the keys, but yeah, you can't use the Ordered one in that case, you need to make a struct with two members and implement a Less

@tdeebswihart
Copy link
Contributor Author

Not to be too pedantic since it's already a huge improvement, but I'm curious how the performance would compare to using https://pkg.go.dev/github.com/google/btree, specifically the ordered generic version (NewOrderedG). That's the ordered map for Go I've used a few times in the past

The implementation using btree and a second map to actually store the ack status performs far worse than gods' treemap:

$ go test -bench=AckManager ./service/matching -run=FooBarBaz
goos: darwin
goarch: arm64
pkg: go.temporal.io/server/service/matching
BenchmarkAckManager_AddTask-12         	   3495	   296339 ns/op
BenchmarkAckManager_CompleteTask-12    	   3706	   321754 ns/op

Here's the diff if you want to vet it:

diff --git a/service/matching/ack_manager.go b/service/matching/ack_manager.go
index a2d0afc67..25cf972f1 100644
--- a/service/matching/ack_manager.go
+++ b/service/matching/ack_manager.go
@@ -27,7 +27,7 @@ package matching
 import (
 	"sync"
 
-	"github.com/emirpasic/gods/maps/treemap"
+	"github.com/google/btree"
 	"go.uber.org/atomic"
 
 	"go.temporal.io/server/common/log"
@@ -50,9 +50,10 @@ func int64Comparator(a, b interface{}) int {
 // Used to convert out of order acks into ackLevel movement.
 type ackManager struct {
 	sync.RWMutex
-	outstandingTasks *treemap.Map // TaskID->acked
-	readLevel        int64        // Maximum TaskID inserted into outstandingTasks
-	ackLevel         int64        // Maximum TaskID below which all tasks are acked
+	outstandingTasks *btree.BTreeG[int64] // TaskID->acked
+	ackStatus        map[int64]bool
+	readLevel        int64 // Maximum TaskID inserted into outstandingTasks
+	ackLevel         int64 // Maximum TaskID below which all tasks are acked
 	backlogCounter   atomic.Int64
 	logger           log.Logger
 }
@@ -60,7 +61,8 @@ type ackManager struct {
 func newAckManager(logger log.Logger) ackManager {
 	return ackManager{
 		logger:           logger,
-		outstandingTasks: treemap.NewWith(int64Comparator),
+		outstandingTasks: btree.NewOrderedG[int64](2),
+		ackStatus:        make(map[int64]bool),
 		readLevel:        -1,
 		ackLevel:         -1}
 }
@@ -75,10 +77,10 @@ func (m *ackManager) addTask(taskID int64) {
 			tag.ReadLevel(m.readLevel))
 	}
 	m.readLevel = taskID
-	if _, found := m.outstandingTasks.Get(taskID); found {
+	if _, found := m.outstandingTasks.ReplaceOrInsert(taskID); found {
 		m.logger.Fatal("Already present in outstanding tasks", tag.TaskID(taskID))
 	}
-	m.outstandingTasks.Put(taskID, false)
+	m.ackStatus[taskID] = false
 	m.backlogCounter.Inc()
 }
 
@@ -131,14 +133,15 @@ func (m *ackManager) setAckLevel(ackLevel int64) {
 func (m *ackManager) completeTask(taskID int64) int64 {
 	m.Lock()
 	defer m.Unlock()
-	macked, found := m.outstandingTasks.Get(taskID)
-	acked := macked.(bool)
-	if !found || acked {
+	if !m.outstandingTasks.Has(taskID) {
+		return m.ackLevel
+	}
+	acked, _ := m.ackStatus[taskID]
+	if acked {
 		// don't adjust ack level if nothing has changed
 		return m.ackLevel
 	}
-
-	m.outstandingTasks.Put(taskID, true)
+	m.ackStatus[taskID] = true
 	m.backlogCounter.Dec()
 
 	min, _ := m.outstandingTasks.Min()
@@ -147,20 +150,19 @@ func (m *ackManager) completeTask(taskID int64) int64 {
 	}
 
 	// We've acked the minimum task, so should adjust the ack level as far as we can
-	iter := m.outstandingTasks.Iterator()
-	iter.Begin()
 	// deletion invalidates the iterator
 	var toDel []int64
-	for iter.Next() {
-		acked := iter.Value().(bool)
-		if !acked {
-			break
+	m.outstandingTasks.Ascend(func(key int64) bool {
+		if acked := m.ackStatus[key]; !acked {
+			return false
 		}
-		m.ackLevel = iter.Key().(int64)
+		m.ackLevel = key
 		toDel = append(toDel, m.ackLevel)
-	}
+		return true
+	})
 	for i := 0; i < len(toDel); i++ {
-		m.outstandingTasks.Remove(toDel[i])
+		m.outstandingTasks.Delete(toDel[i])
+		delete(m.ackStatus, toDel[i])
 	}
 
 	return m.ackLevel
diff --git a/service/matching/ack_manager.go b/service/matching/ack_manager.go
index a2d0afc67..25cf972f1 100644
--- a/service/matching/ack_manager.go
+++ b/service/matching/ack_manager.go
@@ -27,7 +27,7 @@ package matching
 import (
 	"sync"
 
-	"github.com/emirpasic/gods/maps/treemap"
+	"github.com/google/btree"
 	"go.uber.org/atomic"
 
 	"go.temporal.io/server/common/log"
@@ -50,9 +50,10 @@ func int64Comparator(a, b interface{}) int {
 // Used to convert out of order acks into ackLevel movement.
 type ackManager struct {
 	sync.RWMutex
-	outstandingTasks *treemap.Map // TaskID->acked
-	readLevel        int64        // Maximum TaskID inserted into outstandingTasks
-	ackLevel         int64        // Maximum TaskID below which all tasks are acked
+	outstandingTasks *btree.BTreeG[int64] // TaskID->acked
+	ackStatus        map[int64]bool
+	readLevel        int64 // Maximum TaskID inserted into outstandingTasks
+	ackLevel         int64 // Maximum TaskID below which all tasks are acked
 	backlogCounter   atomic.Int64
 	logger           log.Logger
 }
@@ -60,7 +61,8 @@ type ackManager struct {
 func newAckManager(logger log.Logger) ackManager {
 	return ackManager{
 		logger:           logger,
-		outstandingTasks: treemap.NewWith(int64Comparator),
+		outstandingTasks: btree.NewOrderedG[int64](2),
+		ackStatus:        make(map[int64]bool),
 		readLevel:        -1,
 		ackLevel:         -1}
 }
@@ -75,10 +77,10 @@ func (m *ackManager) addTask(taskID int64) {
 			tag.ReadLevel(m.readLevel))
 	}
 	m.readLevel = taskID
-	if _, found := m.outstandingTasks.Get(taskID); found {
+	if _, found := m.outstandingTasks.ReplaceOrInsert(taskID); found {
 		m.logger.Fatal("Already present in outstanding tasks", tag.TaskID(taskID))
 	}
-	m.outstandingTasks.Put(taskID, false)
+	m.ackStatus[taskID] = false
 	m.backlogCounter.Inc()
 }
 
@@ -131,14 +133,15 @@ func (m *ackManager) setAckLevel(ackLevel int64) {
 func (m *ackManager) completeTask(taskID int64) int64 {
 	m.Lock()
 	defer m.Unlock()
-	macked, found := m.outstandingTasks.Get(taskID)
-	acked := macked.(bool)
-	if !found || acked {
+	if !m.outstandingTasks.Has(taskID) {
+		return m.ackLevel
+	}
+	acked, _ := m.ackStatus[taskID]
+	if acked {
 		// don't adjust ack level if nothing has changed
 		return m.ackLevel
 	}
-
-	m.outstandingTasks.Put(taskID, true)
+	m.ackStatus[taskID] = true
 	m.backlogCounter.Dec()
 
 	min, _ := m.outstandingTasks.Min()
@@ -147,20 +150,19 @@ func (m *ackManager) completeTask(taskID int64) int64 {
 	}
 
 	// We've acked the minimum task, so should adjust the ack level as far as we can
-	iter := m.outstandingTasks.Iterator()
-	iter.Begin()
 	// deletion invalidates the iterator
 	var toDel []int64
-	for iter.Next() {
-		acked := iter.Value().(bool)
-		if !acked {
-			break
+	m.outstandingTasks.Ascend(func(key int64) bool {
+		if acked := m.ackStatus[key]; !acked {
+			return false
 		}
-		m.ackLevel = iter.Key().(int64)
+		m.ackLevel = key
 		toDel = append(toDel, m.ackLevel)
-	}
+		return true
+	})
 	for i := 0; i < len(toDel); i++ {
-		m.outstandingTasks.Remove(toDel[i])
+		m.outstandingTasks.Delete(toDel[i])
+		delete(m.ackStatus, toDel[i])
 	}
 
 	return m.ackLevel

@tdeebswihart
Copy link
Contributor Author

Looks like this doesn't allow us to store values alongside the keys, hrm. I'll build it with this alongside a map of K->V to see how it performs

Oh right, I forgot you need the bool. You can totally store values next to the keys, but yeah, you can't use the Ordered one in that case, you need to make a struct with two members and implement a Less

Fair. I'll try that and see if it performs better

@tdeebswihart tdeebswihart requested a review from dnr December 11, 2023 18:05
@tdeebswihart tdeebswihart requested a review from yycptt December 11, 2023 23:05
"go.temporal.io/server/common/util"
)

func int64Comparator(a, b interface{}) int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there's a github.com/emirpasic/gods/utils.Int64Comparator.. why copy it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I copied it from the other place in our codebase that uses the gods library

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace them both with that, then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

taskIDs := maps.Keys(m.outstandingTasks)
util.SortSlice(taskIDs)
min, _ := m.outstandingTasks.Min()
if taskID != min {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskID here is int64 and min is interface{}.. I don't think this comparison is valid. actually I'm not sure how the compiler accepts this...

delete(m.outstandingTasks, taskID)
} else {
return m.ackLevel
// We've acked the minimum task, so should adjust the ack level as far as we can
Copy link
Contributor

@dnr dnr Dec 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be simpler to write the whole thing (starting from line 146) like this:

for {
	min, acked := m.outstandingTasks.Min()
	if min == nil || !acked.(bool) {
		return m.ackLevel
	}
	m.outstandingTasks.Remove(min)
}

note that we currently never complete tasks out of order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I overcomplicated this

@tdeebswihart tdeebswihart requested a review from dnr December 13, 2023 22:28
"go.temporal.io/server/common/util"
)

func int64Comparator(a, b interface{}) int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we replace them both with that, then?

Comment on lines +151 to +154
min, _ := m.outstandingTasks.Min()
if taskID != min.(int64) {
return m.ackLevel
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just get rid of this check? the loop below will take care of it

I was lazy on my previous commit and should have done this then
@tdeebswihart tdeebswihart requested a review from dnr December 13, 2023 22:45
@tdeebswihart tdeebswihart merged commit 8796381 into main Dec 13, 2023
@tdeebswihart tdeebswihart deleted the tds/improve-ack-manager branch December 13, 2023 23:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants