From e2fab053a8e51c3d62a2c85c036062a31ba17984 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 8 Mar 2026 11:04:47 +0100 Subject: [PATCH 01/10] fix: fixup/down for the remove path Signed-off-by: Valery Piashchynski --- binary_heap.go | 67 ++++----- binary_heap_test.go | 340 +++++++++++++++++++++++++++++++++++++++++++- monotonic_stack.go | 2 +- 3 files changed, 362 insertions(+), 47 deletions(-) diff --git a/binary_heap.go b/binary_heap.go index 58d723d..cd81c58 100644 --- a/binary_heap.go +++ b/binary_heap.go @@ -2,7 +2,6 @@ package priorityqueue import ( "sync" - "sync/atomic" ) // Item represents a binary heap item @@ -21,7 +20,6 @@ type BinHeap[T Item] struct { exists map[string]struct{} st *stack // find a way to use a pointer to the raw data - len uint64 maxLen uint64 cond sync.Cond } @@ -31,14 +29,13 @@ func NewBinHeap[T Item](maxLen uint64) *BinHeap[T] { items: make([]T, 0, 1000), exists: make(map[string]struct{}, 1000), st: newStack(), - len: 0, maxLen: maxLen, cond: sync.Cond{L: &sync.Mutex{}}, } } func (bh *BinHeap[T]) fixUp() { - k := bh.len - 1 + k := uint64(len(bh.items)) - 1 p := (k - 1) >> 1 // k-1 / 2 for k > 0 { @@ -84,11 +81,8 @@ func (bh *BinHeap[T]) Exists(id string) bool { bh.cond.L.Lock() defer bh.cond.L.Unlock() - if _, ok := bh.exists[id]; ok { - return true - } - - return false + _, ok := bh.exists[id] + return ok } // Remove removes all elements with the provided ID and returns the slice with them @@ -108,16 +102,21 @@ func (bh *BinHeap[T]) Remove(groupID string) []T { } ids := bh.st.Indices() - adjusment := 0 + adjustment := 0 for i := range ids { - start := ids[i][0] - adjusment - end := ids[i][1] - adjusment + start := ids[i][0] - adjustment + end := ids[i][1] - adjustment bh.items = append(bh.items[:start], bh.items[end+1:]...) - adjusment += end - start + 1 + adjustment += end - start + 1 + } + + // re-heapify after compaction (Floyd's algorithm) + n := len(bh.items) + for i := n/2 - 1; i >= 0; i-- { + bh.fixDown(i, n-1) } - atomic.StoreUint64(&bh.len, uint64(len(bh.items))) bh.st.clear() return out @@ -128,7 +127,7 @@ func (bh *BinHeap[T]) PeekPriority() int64 { bh.cond.L.Lock() defer bh.cond.L.Unlock() - if bh.Len() > 0 { + if len(bh.items) > 0 { return bh.items[0].Priority() } @@ -136,31 +135,20 @@ func (bh *BinHeap[T]) PeekPriority() int64 { } func (bh *BinHeap[T]) Len() uint64 { - return atomic.LoadUint64(&bh.len) + bh.cond.L.Lock() + defer bh.cond.L.Unlock() + return uint64(len(bh.items)) } func (bh *BinHeap[T]) Insert(item T) { bh.cond.L.Lock() - // check the binary heap len before insertion - if bh.Len() > bh.maxLen { - // unlock the mutex to proceed to get-max - bh.cond.L.Unlock() - - // signal waiting goroutines - for bh.Len() > 0 { - // signal waiting goroutines - bh.cond.Signal() - } - // lock mutex to proceed inserting into the empty slice - bh.cond.L.Lock() + for uint64(len(bh.items)) >= bh.maxLen { + bh.cond.Wait() } bh.items = append(bh.items, item) - // add len to the slice - atomic.AddUint64(&bh.len, 1) - // fix binary heap up bh.fixUp() @@ -177,22 +165,23 @@ func (bh *BinHeap[T]) ExtractMin() T { bh.cond.L.Lock() // if len == 0, wait for the signal - for bh.Len() == 0 { + for len(bh.items) == 0 { bh.cond.Wait() } - bh.swap(0, bh.len-1) + n := uint64(len(bh.items)) + bh.swap(0, n-1) - item := (bh.items)[int(bh.len)-1] //nolint:gosec - bh.items = (bh).items[0 : int(bh.len)-1] //nolint:gosec - bh.fixDown(0, int(bh.len-2)) //nolint:gosec - - // reduce len - atomic.AddUint64(&bh.len, ^uint64(0)) + item := bh.items[n-1] + bh.items = bh.items[:n-1] + bh.fixDown(0, int(n)-2) //nolint:gosec // remove item delete(bh.exists, item.ID()) bh.cond.L.Unlock() + + // signal blocked producers waiting for space + bh.cond.Signal() return item } diff --git a/binary_heap_test.go b/binary_heap_test.go index c821066..c16b354 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -289,14 +289,35 @@ func TestItemPeekConcurrent(t *testing.T) { go func() { defer wg.Done() for range 11 { - min := bh.ExtractMin() - _ = min + m := bh.ExtractMin() + _ = m } }() wg.Wait() } +func TestBinHeap_RemoveHeapProperty(t *testing.T) { + // Regression test: Remove must restore the heap property after compaction. + // Insert priorities [1(A), 3(B), 2(B)] → heap: [1, 3, 2] + // Remove group "A" → compacts to [3, 2] which violates min-heap + // Without re-heapify, ExtractMin would return 3 instead of 2. + bh := NewBinHeap[Item](10) + bh.Insert(NewTest(1, "A", "id1")) + bh.Insert(NewTest(3, "B", "id2")) + bh.Insert(NewTest(2, "B", "id3")) + + removed := bh.Remove("A") + require.Len(t, removed, 1) + require.Equal(t, "id1", removed[0].ID()) + + first := bh.ExtractMin() + assert.Equal(t, int64(2), first.Priority(), "expected min priority 2, got %d", first.Priority()) + + second := bh.ExtractMin() + assert.Equal(t, int64(3), second.Priority(), "expected priority 3, got %d", second.Priority()) +} + func TestBinHeap_Remove(t *testing.T) { a := []Item{ NewTest(2, "1", "101"), @@ -377,15 +398,320 @@ func TestExists(t *testing.T) { assert.False(t, bh.Exists(id)) } -func BenchmarkGeneral(b *testing.B) { +func TestBinHeap_RemoveHeapPropertyLarge(t *testing.T) { + bh := NewBinHeap[Item](200) + + // Insert 100 items across 5 groups with interleaved priorities so + // the target group's items are scattered at root, mid, and leaf heap levels. + for i := 0; i < 100; i++ { + groupID := fmt.Sprintf("g%d", i%5) + priority := int64(i + 1) // 1..100, round-robin across groups + id := fmt.Sprintf("item-%d", i) + bh.Insert(NewTest(priority, groupID, id)) + } + + require.Equal(t, uint64(100), bh.Len()) + + // Remove group "g2" (priorities 3,8,13,18,...,98 — 20 items) + removed := bh.Remove("g2") + require.Len(t, removed, 20) + for _, item := range removed { + require.Equal(t, "g2", item.GroupID()) + } + require.Equal(t, uint64(80), bh.Len()) + + // Extract all remaining items and verify strictly non-decreasing order + var prev int64 + for i := 0; i < 80; i++ { + item := bh.ExtractMin() + require.GreaterOrEqual(t, item.Priority(), prev, + "item %d: priority %d should be >= previous %d", i, item.Priority(), prev) + require.NotEqual(t, "g2", item.GroupID()) + prev = item.Priority() + } + + require.Equal(t, uint64(0), bh.Len()) +} + +func TestBinHeap_RemoveMultipleGroups(t *testing.T) { bh := NewBinHeap[Item](100) - id := uuid.NewString() - id2 := uuid.NewString() + // 4 groups with known priorities + bh.Insert(NewTest(10, "A", "a1")) + bh.Insert(NewTest(30, "A", "a2")) + bh.Insert(NewTest(5, "B", "b1")) + bh.Insert(NewTest(25, "B", "b2")) + bh.Insert(NewTest(15, "C", "c1")) + bh.Insert(NewTest(35, "C", "c2")) + bh.Insert(NewTest(20, "D", "d1")) + bh.Insert(NewTest(40, "D", "d2")) + + // Remove group A, verify min is B's 5 + removedA := bh.Remove("A") + require.Len(t, removedA, 2) + require.Equal(t, int64(5), bh.PeekPriority()) + + // Remove group B, verify min is now C's 15 + removedB := bh.Remove("B") + require.Len(t, removedB, 2) + require.Equal(t, int64(15), bh.PeekPriority()) + + // Extract remaining items (C and D) and verify order + expected := []int64{15, 20, 35, 40} + for _, exp := range expected { + item := bh.ExtractMin() + require.Equal(t, exp, item.Priority()) + } +} + +func TestBinHeap_RemoveEdgeCases(t *testing.T) { + t.Run("remove all items", func(t *testing.T) { + bh := NewBinHeap[Item](10) + bh.Insert(NewTest(1, "only", "id1")) + bh.Insert(NewTest(2, "only", "id2")) + bh.Insert(NewTest(3, "only", "id3")) + + removed := bh.Remove("only") + require.Len(t, removed, 3) + require.Equal(t, uint64(0), bh.Len()) + + // Insert new items and verify they work after full removal + bh.Insert(NewTest(42, "new", "id4")) + require.Equal(t, uint64(1), bh.Len()) + item := bh.ExtractMin() + require.Equal(t, int64(42), item.Priority()) + }) + + t.Run("remove non-existent group", func(t *testing.T) { + bh := NewBinHeap[Item](10) + bh.Insert(NewTest(1, "exists", "id1")) + bh.Insert(NewTest(2, "exists", "id2")) + + removed := bh.Remove("ghost") + require.Empty(t, removed) + require.Equal(t, uint64(2), bh.Len()) + + // Verify heap still works correctly + item := bh.ExtractMin() + require.Equal(t, int64(1), item.Priority()) + }) + + t.Run("remove from empty heap", func(t *testing.T) { + bh := NewBinHeap[Item](10) + removed := bh.Remove("anything") + require.Empty(t, removed) + require.Equal(t, uint64(0), bh.Len()) + }) +} + +func TestBinHeap_BoundedInsertBackpressure(t *testing.T) { + bh := NewBinHeap[Item](5) + + // Fill to capacity + for i := 0; i < 5; i++ { + bh.Insert(NewTest(int64(i+1), "g1", fmt.Sprintf("item-%d", i))) + } + require.Equal(t, uint64(5), bh.Len()) + + // Launch goroutine to insert one more (should block at capacity) + inserted := make(chan struct{}) + go func() { + bh.Insert(NewTest(10, "g1", "blocked-item")) + close(inserted) + }() + + // Give goroutine time to start and block on the full heap + time.Sleep(100 * time.Millisecond) + require.Equal(t, uint64(5), bh.Len(), "producer should be blocked, heap still at capacity") + + // Extract one item to free space and signal the blocked producer + item := bh.ExtractMin() + require.Equal(t, int64(1), item.Priority()) + + // Wait for insert goroutine to complete + select { + case <-inserted: + // success — producer unblocked + case <-time.After(2 * time.Second): + t.Fatal("insert goroutine did not unblock after ExtractMin") + } + + require.Equal(t, uint64(5), bh.Len(), "should be 5: was 5, extracted 1, inserted 1") +} + +func TestBinHeap_ConcurrentInsertRemoveExtract(t *testing.T) { + // Large capacity to avoid Insert back-pressure during stress test; + // back-pressure is tested separately in TestBinHeap_BoundedInsertBackpressure. + bh := NewBinHeap[Item](100_000) + + var done atomic.Bool + var producerWg sync.WaitGroup + var consumerWg sync.WaitGroup + + // 3 producer goroutines inserting items with random priorities across 10 groups + for p := 0; p < 3; p++ { + producerWg.Add(1) + go func(id int) { + defer producerWg.Done() + for i := 0; !done.Load(); i++ { + groupID := fmt.Sprintf("g%d", i%10) + itemID := fmt.Sprintf("p%d-i%d", id, i) + bh.Insert(NewTest(rand.Int63n(1000), groupID, itemID)) //nolint:gosec + } + }(p) + } + + // 2 consumer goroutines calling ExtractMin + for c := 0; c < 2; c++ { + consumerWg.Add(1) + go func() { + defer consumerWg.Done() + for !done.Load() { + _ = bh.ExtractMin() + } + }() + } + + // 1 remover goroutine periodically removing a random group + producerWg.Add(1) + go func() { + defer producerWg.Done() + for !done.Load() { + bh.Remove(fmt.Sprintf("g%d", rand.Intn(10))) //nolint:gosec + time.Sleep(10 * time.Millisecond) + } + }() + + // Run for 2 seconds + time.Sleep(2 * time.Second) + done.Store(true) + + // Wait for producers and remover to finish + producerWg.Wait() + + // Unblock consumers that may be stuck waiting on an empty heap + consumerDone := make(chan struct{}) + go func() { + consumerWg.Wait() + close(consumerDone) + }() + + for { + select { + case <-consumerDone: + // All consumers exited — verify heap is in a consistent state + _ = bh.Len() + return + default: + bh.Insert(NewTest(0, "sentinel", fmt.Sprintf("s-%d", time.Now().UnixNano()))) + time.Sleep(10 * time.Millisecond) + } + } +} + +func TestBinHeap_LargeScaleOrdering(t *testing.T) { + const n = 10_000 + bh := NewBinHeap[Item](uint64(n) + 1) + + for i := 0; i < n; i++ { + bh.Insert(NewTest(rand.Int63n(1000), "g", fmt.Sprintf("item-%d", i))) //nolint:gosec + } + + var prev int64 + for i := 0; i < n; i++ { + item := bh.ExtractMin() + require.GreaterOrEqual(t, item.Priority(), prev, + "item %d: priority %d should be >= previous %d", i, item.Priority(), prev) + prev = item.Priority() + } +} + +func BenchmarkInsert(b *testing.B) { + bh := NewBinHeap[Item](1 << 30) + b.ReportAllocs() + i := 0 + for b.Loop() { + bh.Insert(NewTest(rand.Int63n(100000), "bench", fmt.Sprintf("b-%d", i))) //nolint:gosec + i++ + } +} + +func BenchmarkExtractMin(b *testing.B) { + bh := NewBinHeap[Item](uint64(max(b.N, 0)) + 1) + for i := range b.N { + bh.Insert(NewTest(rand.Int63n(100000), "bench", fmt.Sprintf("b-%d", i))) //nolint:gosec + } b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + bh.ExtractMin() + } +} +func BenchmarkInsertExtractMin(b *testing.B) { + bh := NewBinHeap[Item](2000) + // Pre-fill with 1000 items + for i := 0; i < 1000; i++ { + bh.Insert(NewTest(rand.Int63n(100000), "bench", fmt.Sprintf("pre-%d", i))) //nolint:gosec + } + b.ReportAllocs() + b.ResetTimer() + i := 0 for b.Loop() { - bh.Insert(NewTest(2, id, id2)) - bh.Remove(id) + bh.Insert(NewTest(rand.Int63n(100000), "bench", fmt.Sprintf("b-%d", i))) //nolint:gosec + bh.ExtractMin() + i++ + } +} + +func BenchmarkRemove(b *testing.B) { + const numGroups = 100 + const itemsPerGroup = 10 + bh := NewBinHeap[Item](numGroups*itemsPerGroup + 100) + + // Fill with 1000 items across 100 groups (10 items each) + groups := make([][]Item, numGroups) + for g := 0; g < numGroups; g++ { + groups[g] = make([]Item, 0, itemsPerGroup) + for i := 0; i < itemsPerGroup; i++ { + item := NewTest(rand.Int63n(10000), fmt.Sprintf("g%d", g), fmt.Sprintf("g%d-i%d", g, i)) //nolint:gosec + bh.Insert(item) + groups[g] = append(groups[g], item) + } + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + groupIdx := i % numGroups + groupID := fmt.Sprintf("g%d", groupIdx) + bh.Remove(groupID) + // Restore items for next iteration + b.StopTimer() + for _, item := range groups[groupIdx] { + bh.Insert(item) + } + b.StartTimer() } } + +func BenchmarkConcurrentInsertExtract(b *testing.B) { + bh := NewBinHeap[Item](10000) + // Pre-fill so ExtractMin rarely blocks + for i := 0; i < 5000; i++ { + bh.Insert(NewTest(rand.Int63n(10000), "bench", fmt.Sprintf("pre-%d", i))) //nolint:gosec + } + b.ReportAllocs() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + if i%2 == 0 { + bh.Insert(NewTest(rand.Int63n(10000), "bench", fmt.Sprintf("p-%d-%d", i, rand.Int63()))) //nolint:gosec + } else { + bh.ExtractMin() + } + i++ + } + }) +} diff --git a/monotonic_stack.go b/monotonic_stack.go index 3094b53..8759a7a 100644 --- a/monotonic_stack.go +++ b/monotonic_stack.go @@ -45,5 +45,5 @@ func (st *stack) Indices() [][2]int { } func (st *stack) clear() { - st.idx = make([][2]int, 0, 10) + st.idx = st.idx[:0] } From 1f7207bda9def3949714a18bf6ee8cc50da2492b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 8 Mar 2026 11:53:26 +0100 Subject: [PATCH 02/10] chore: fix remove edge case Signed-off-by: Valery Piashchynski --- binary_heap.go | 1 + binary_heap_test.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/binary_heap.go b/binary_heap.go index cd81c58..4ace477 100644 --- a/binary_heap.go +++ b/binary_heap.go @@ -118,6 +118,7 @@ func (bh *BinHeap[T]) Remove(groupID string) []T { } bh.st.clear() + bh.cond.Broadcast() return out } diff --git a/binary_heap_test.go b/binary_heap_test.go index c16b354..8996435 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -539,6 +539,42 @@ func TestBinHeap_BoundedInsertBackpressure(t *testing.T) { require.Equal(t, uint64(5), bh.Len(), "should be 5: was 5, extracted 1, inserted 1") } +func TestBinHeap_RemoveUnblocksInsert(t *testing.T) { + bh := NewBinHeap[Item](5) + + // Fill to capacity with one removable group + for i := 0; i < 5; i++ { + bh.Insert(NewTest(int64(i+1), "removeMe", fmt.Sprintf("item-%d", i))) + } + require.Equal(t, uint64(5), bh.Len()) + + // Launch goroutine to insert one more (should block at capacity) + inserted := make(chan struct{}) + go func() { + bh.Insert(NewTest(99, "keep", "new-item")) + close(inserted) + }() + + // Give goroutine time to start and block + time.Sleep(100 * time.Millisecond) + require.Equal(t, uint64(5), bh.Len(), "producer should be blocked, heap still at capacity") + + // Remove group to free space — this should unblock the producer + removed := bh.Remove("removeMe") + require.Len(t, removed, 5) + + select { + case <-inserted: + // success — producer unblocked by Remove + case <-time.After(2 * time.Second): + t.Fatal("insert goroutine did not unblock after Remove freed space") + } + + require.Equal(t, uint64(1), bh.Len()) + item := bh.ExtractMin() + require.Equal(t, int64(99), item.Priority()) +} + func TestBinHeap_ConcurrentInsertRemoveExtract(t *testing.T) { // Large capacity to avoid Insert back-pressure during stress test; // back-pressure is tested separately in TestBinHeap_BoundedInsertBackpressure. From 85fc1f2bf15d9fe5049ec577c8c06a401dabdf18 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 8 Mar 2026 12:11:43 +0100 Subject: [PATCH 03/10] chore: fix inconsistences Signed-off-by: Valery Piashchynski --- binary_heap.go | 6 ++---- binary_heap_test.go | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/binary_heap.go b/binary_heap.go index 4ace477..641eed0 100644 --- a/binary_heap.go +++ b/binary_heap.go @@ -156,10 +156,9 @@ func (bh *BinHeap[T]) Insert(item T) { // add item bh.exists[item.ID()] = struct{}{} - bh.cond.L.Unlock() - // signal the goroutine on wait bh.cond.Signal() + bh.cond.L.Unlock() } func (bh *BinHeap[T]) ExtractMin() T { @@ -180,9 +179,8 @@ func (bh *BinHeap[T]) ExtractMin() T { // remove item delete(bh.exists, item.ID()) - bh.cond.L.Unlock() - // signal blocked producers waiting for space bh.cond.Signal() + bh.cond.L.Unlock() return item } diff --git a/binary_heap_test.go b/binary_heap_test.go index 8996435..0afdb7f 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -349,7 +349,7 @@ func TestBinHeap_Remove(t *testing.T) { out := bh.Remove("1") if len(out) != 6 { - t.Fatal("should be 5") + t.Fatalf("expected 6, got %d", len(out)) } for i := range out { From a6b5e53840137c4ac7a2cbabaeb90f3592cad910 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 8 Mar 2026 12:25:10 +0100 Subject: [PATCH 04/10] chore: small fixes in tests Signed-off-by: Valery Piashchynski --- binary_heap_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binary_heap_test.go b/binary_heap_test.go index 0afdb7f..fffc43f 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -578,7 +578,7 @@ func TestBinHeap_RemoveUnblocksInsert(t *testing.T) { func TestBinHeap_ConcurrentInsertRemoveExtract(t *testing.T) { // Large capacity to avoid Insert back-pressure during stress test; // back-pressure is tested separately in TestBinHeap_BoundedInsertBackpressure. - bh := NewBinHeap[Item](100_000) + bh := NewBinHeap[Item](^uint64(0)) var done atomic.Bool var producerWg sync.WaitGroup From 9609c1d8e5ad750fd690085737facc71e984986b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 8 Mar 2026 12:40:27 +0100 Subject: [PATCH 05/10] chore: go 1.26 code modernization Signed-off-by: Valery Piashchynski --- binary_heap.go | 4 +- binary_heap_test.go | 275 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 234 insertions(+), 45 deletions(-) diff --git a/binary_heap.go b/binary_heap.go index 641eed0..bfd7a05 100644 --- a/binary_heap.go +++ b/binary_heap.go @@ -157,7 +157,7 @@ func (bh *BinHeap[T]) Insert(item T) { bh.exists[item.ID()] = struct{}{} // signal the goroutine on wait - bh.cond.Signal() + bh.cond.Broadcast() bh.cond.L.Unlock() } @@ -180,7 +180,7 @@ func (bh *BinHeap[T]) ExtractMin() T { delete(bh.exists, item.ID()) // signal blocked producers waiting for space - bh.cond.Signal() + bh.cond.Broadcast() bh.cond.L.Unlock() return item } diff --git a/binary_heap_test.go b/binary_heap_test.go index fffc43f..2b42e51 100644 --- a/binary_heap_test.go +++ b/binary_heap_test.go @@ -121,8 +121,8 @@ func TestBinHeap_MaxLen(t *testing.T) { } func TestNewPriorityQueue(t *testing.T) { - insertsPerSec := uint64(0) - getPerSec := uint64(0) + var insertsPerSec atomic.Uint64 + var getPerSec atomic.Uint64 stopCh := make(chan struct{}, 1) pq := NewBinHeap[Item](1000) @@ -144,10 +144,10 @@ func TestNewPriorityQueue(t *testing.T) { for { select { case <-tt.C: - fmt.Printf("Insert per second: %d\n", atomic.LoadUint64(&insertsPerSec)) - atomic.StoreUint64(&insertsPerSec, 0) - fmt.Printf("ExtractMin per second: %d\n", atomic.LoadUint64(&getPerSec)) - atomic.StoreUint64(&getPerSec, 0) + fmt.Printf("Insert per second: %d\n", insertsPerSec.Load()) + insertsPerSec.Store(0) + fmt.Printf("ExtractMin per second: %d\n", getPerSec.Load()) + getPerSec.Store(0) case <-stopCh: tt.Stop() return @@ -162,7 +162,7 @@ func TestNewPriorityQueue(t *testing.T) { return default: pq.ExtractMin() - atomic.AddUint64(&getPerSec, 1) + getPerSec.Add(1) } } }() @@ -174,7 +174,7 @@ func TestNewPriorityQueue(t *testing.T) { return default: pq.Insert(NewTest(rand.Int63(), uuid.NewString(), uuid.NewString())) //nolint:gosec - atomic.AddUint64(&insertsPerSec, 1) + insertsPerSec.Add(1) } } }() @@ -276,23 +276,20 @@ func TestItemPeekConcurrent(t *testing.T) { bh.Insert(a[i]) } - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - defer wg.Done() + var wg sync.WaitGroup + wg.Go(func() { for range 1000 { tmp := bh.PeekPriority() _ = tmp } - }() + }) - go func() { - defer wg.Done() + wg.Go(func() { for range 11 { m := bh.ExtractMin() _ = m } - }() + }) wg.Wait() } @@ -403,7 +400,7 @@ func TestBinHeap_RemoveHeapPropertyLarge(t *testing.T) { // Insert 100 items across 5 groups with interleaved priorities so // the target group's items are scattered at root, mid, and leaf heap levels. - for i := 0; i < 100; i++ { + for i := range 100 { groupID := fmt.Sprintf("g%d", i%5) priority := int64(i + 1) // 1..100, round-robin across groups id := fmt.Sprintf("item-%d", i) @@ -422,7 +419,7 @@ func TestBinHeap_RemoveHeapPropertyLarge(t *testing.T) { // Extract all remaining items and verify strictly non-decreasing order var prev int64 - for i := 0; i < 80; i++ { + for i := range 80 { item := bh.ExtractMin() require.GreaterOrEqual(t, item.Priority(), prev, "item %d: priority %d should be >= previous %d", i, item.Priority(), prev) @@ -508,7 +505,7 @@ func TestBinHeap_BoundedInsertBackpressure(t *testing.T) { bh := NewBinHeap[Item](5) // Fill to capacity - for i := 0; i < 5; i++ { + for i := range 5 { bh.Insert(NewTest(int64(i+1), "g1", fmt.Sprintf("item-%d", i))) } require.Equal(t, uint64(5), bh.Len()) @@ -543,7 +540,7 @@ func TestBinHeap_RemoveUnblocksInsert(t *testing.T) { bh := NewBinHeap[Item](5) // Fill to capacity with one removable group - for i := 0; i < 5; i++ { + for i := range 5 { bh.Insert(NewTest(int64(i+1), "removeMe", fmt.Sprintf("item-%d", i))) } require.Equal(t, uint64(5), bh.Len()) @@ -585,38 +582,32 @@ func TestBinHeap_ConcurrentInsertRemoveExtract(t *testing.T) { var consumerWg sync.WaitGroup // 3 producer goroutines inserting items with random priorities across 10 groups - for p := 0; p < 3; p++ { - producerWg.Add(1) - go func(id int) { - defer producerWg.Done() + for p := range 3 { + producerWg.Go(func() { for i := 0; !done.Load(); i++ { groupID := fmt.Sprintf("g%d", i%10) - itemID := fmt.Sprintf("p%d-i%d", id, i) + itemID := fmt.Sprintf("p%d-i%d", p, i) bh.Insert(NewTest(rand.Int63n(1000), groupID, itemID)) //nolint:gosec } - }(p) + }) } // 2 consumer goroutines calling ExtractMin - for c := 0; c < 2; c++ { - consumerWg.Add(1) - go func() { - defer consumerWg.Done() + for range 2 { + consumerWg.Go(func() { for !done.Load() { _ = bh.ExtractMin() } - }() + }) } // 1 remover goroutine periodically removing a random group - producerWg.Add(1) - go func() { - defer producerWg.Done() + producerWg.Go(func() { for !done.Load() { bh.Remove(fmt.Sprintf("g%d", rand.Intn(10))) //nolint:gosec time.Sleep(10 * time.Millisecond) } - }() + }) // Run for 2 seconds time.Sleep(2 * time.Second) @@ -649,12 +640,12 @@ func TestBinHeap_LargeScaleOrdering(t *testing.T) { const n = 10_000 bh := NewBinHeap[Item](uint64(n) + 1) - for i := 0; i < n; i++ { + for i := range n { bh.Insert(NewTest(rand.Int63n(1000), "g", fmt.Sprintf("item-%d", i))) //nolint:gosec } var prev int64 - for i := 0; i < n; i++ { + for i := range n { item := bh.ExtractMin() require.GreaterOrEqual(t, item.Priority(), prev, "item %d: priority %d should be >= previous %d", i, item.Priority(), prev) @@ -662,6 +653,202 @@ func TestBinHeap_LargeScaleOrdering(t *testing.T) { } } +// TestBinHeap_BroadcastPreventsDeadlock. +// +// With Signal(), the following deadlock is possible (maxLen=1): +// 1. Queue has 1 item. P1, P2 call Insert → both Wait() (queue full). +// 2. C1 calls ExtractMin → extracts the item, Signal() wakes P1. +// 3. C2 calls ExtractMin → acquires lock before P1, sees empty → Wait(). +// 4. P1 wakes, inserts item, Signal() → wakes P2 (wrong type!), not C2. +// 5. P2 rechecks: queue full → Wait(). +// 6. Deadlock: P2 and C2 both waiting, queue has 1 item, no wakeup coming. +// +// With Broadcast(), step 4 wakes both P2 and C2, so C2 rechecks, finds 1 item, +// and successfully extracts — no deadlock. +func TestBinHeap_BroadcastPreventsDeadlock(t *testing.T) { + t.Run("maxLen=1 tight contention", func(t *testing.T) { + const numProducers = 4 + const numConsumers = 4 + const itemsPerProducer = 500 + + bh := NewBinHeap[Item](1) + + var producerWg sync.WaitGroup + var consumerWg sync.WaitGroup + var consumed atomic.Int64 + + totalItems := int64(numProducers * itemsPerProducer) + + // Launch producers: each inserts itemsPerProducer items then exits. + for p := range numProducers { + producerWg.Go(func() { + for i := range itemsPerProducer { + bh.Insert(NewTest(int64(i), fmt.Sprintf("p%d", p), fmt.Sprintf("p%d-i%d", p, i))) + } + }) + } + + // Launch consumers: each extracts until totalItems have been consumed. + for range numConsumers { + consumerWg.Go(func() { + for consumed.Add(1) <= totalItems { + _ = bh.ExtractMin() + } + }) + } + + // Watchdog: if the test doesn't finish within the timeout, it's a deadlock. + done := make(chan struct{}) + go func() { + producerWg.Wait() + consumerWg.Wait() + close(done) + }() + + select { + case <-done: + // success — no deadlock + case <-time.After(10 * time.Second): + t.Fatal("DEADLOCK: producers and consumers did not complete within 10s (missed wakeup)") + } + }) + + t.Run("maxLen=2 multiple rounds", func(t *testing.T) { + // A slightly larger capacity still triggers the bug with Signal() + // because producers and consumers can both be queued on the same cond. + const numProducers = 6 + const numConsumers = 6 + const itemsPerProducer = 300 + + bh := NewBinHeap[Item](2) + + var producerWg sync.WaitGroup + var consumerWg sync.WaitGroup + var consumed atomic.Int64 + + totalItems := int64(numProducers * itemsPerProducer) + + for p := range numProducers { + producerWg.Go(func() { + for i := range itemsPerProducer { + bh.Insert(NewTest(int64(i%50), fmt.Sprintf("p%d", p), fmt.Sprintf("p%d-i%d", p, i))) + } + }) + } + + for range numConsumers { + consumerWg.Go(func() { + for consumed.Add(1) <= totalItems { + _ = bh.ExtractMin() + } + }) + } + + done := make(chan struct{}) + go func() { + producerWg.Wait() + consumerWg.Wait() + close(done) + }() + + select { + case <-done: + // success + case <-time.After(10 * time.Second): + t.Fatal("DEADLOCK: producers and consumers did not complete within 10s (missed wakeup)") + } + }) + + t.Run("asymmetric producers/consumers", func(t *testing.T) { + // Many producers, few consumers — maximizes the chance that Signal() + // wakes a producer instead of the sole consumer. + const numProducers = 8 + const numConsumers = 1 + const itemsPerProducer = 200 + + bh := NewBinHeap[Item](1) + + var producerWg sync.WaitGroup + var consumerWg sync.WaitGroup + var consumed atomic.Int64 + + totalItems := int64(numProducers * itemsPerProducer) + + for p := range numProducers { + producerWg.Go(func() { + for i := range itemsPerProducer { + bh.Insert(NewTest(int64(i), fmt.Sprintf("p%d", p), fmt.Sprintf("p%d-i%d", p, i))) + } + }) + } + + for range numConsumers { + consumerWg.Go(func() { + for consumed.Add(1) <= totalItems { + _ = bh.ExtractMin() + } + }) + } + + done := make(chan struct{}) + go func() { + producerWg.Wait() + consumerWg.Wait() + close(done) + }() + + select { + case <-done: + // success + case <-time.After(10 * time.Second): + t.Fatal("DEADLOCK: producers and consumers did not complete within 10s (missed wakeup)") + } + }) + + t.Run("repeated stress cycles", func(t *testing.T) { + // Run multiple short cycles to increase the probability of hitting + // the specific interleaving that causes a missed wakeup. + for cycle := range 20 { + bh := NewBinHeap[Item](1) + const numGoroutines = 4 + const itemsEach = 100 + + var wg sync.WaitGroup + var consumed atomic.Int64 + totalItems := int64(numGoroutines * itemsEach) + + for g := range numGoroutines { + wg.Go(func() { + for i := range itemsEach { + bh.Insert(NewTest(int64(i), "g", fmt.Sprintf("c%d-g%d-i%d", cycle, g, i))) + } + }) + } + + for range numGoroutines { + wg.Go(func() { + for consumed.Add(1) <= totalItems { + _ = bh.ExtractMin() + } + }) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // cycle passed + case <-time.After(5 * time.Second): + t.Fatalf("DEADLOCK on cycle %d: did not complete within 5s", cycle) + } + } + }) +} + func BenchmarkInsert(b *testing.B) { bh := NewBinHeap[Item](1 << 30) b.ReportAllocs() @@ -679,7 +866,7 @@ func BenchmarkExtractMin(b *testing.B) { } b.ReportAllocs() b.ResetTimer() - for i := 0; i < b.N; i++ { + for range b.N { bh.ExtractMin() } } @@ -687,7 +874,7 @@ func BenchmarkExtractMin(b *testing.B) { func BenchmarkInsertExtractMin(b *testing.B) { bh := NewBinHeap[Item](2000) // Pre-fill with 1000 items - for i := 0; i < 1000; i++ { + for i := range 1000 { bh.Insert(NewTest(rand.Int63n(100000), "bench", fmt.Sprintf("pre-%d", i))) //nolint:gosec } b.ReportAllocs() @@ -707,9 +894,9 @@ func BenchmarkRemove(b *testing.B) { // Fill with 1000 items across 100 groups (10 items each) groups := make([][]Item, numGroups) - for g := 0; g < numGroups; g++ { + for g := range numGroups { groups[g] = make([]Item, 0, itemsPerGroup) - for i := 0; i < itemsPerGroup; i++ { + for i := range itemsPerGroup { item := NewTest(rand.Int63n(10000), fmt.Sprintf("g%d", g), fmt.Sprintf("g%d-i%d", g, i)) //nolint:gosec bh.Insert(item) groups[g] = append(groups[g], item) @@ -718,7 +905,8 @@ func BenchmarkRemove(b *testing.B) { b.ReportAllocs() b.ResetTimer() - for i := 0; i < b.N; i++ { + i := 0 + for b.Loop() { groupIdx := i % numGroups groupID := fmt.Sprintf("g%d", groupIdx) bh.Remove(groupID) @@ -728,13 +916,14 @@ func BenchmarkRemove(b *testing.B) { bh.Insert(item) } b.StartTimer() + i++ } } func BenchmarkConcurrentInsertExtract(b *testing.B) { bh := NewBinHeap[Item](10000) // Pre-fill so ExtractMin rarely blocks - for i := 0; i < 5000; i++ { + for i := range 5000 { bh.Insert(NewTest(rand.Int63n(10000), "bench", fmt.Sprintf("pre-%d", i))) //nolint:gosec } b.ReportAllocs() From 8aeb926aa9ce3a1cf06608b212b17cfb2578affc Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 8 Mar 2026 12:58:45 +0100 Subject: [PATCH 06/10] chore: fix codecov Signed-off-by: Valery Piashchynski --- .github/workflows/linux.yml | 20 +++++++++++++------- codecov.yml | 12 ++++++++++++ 2 files changed, 25 insertions(+), 7 deletions(-) create mode 100644 codecov.yml diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 0dad180..0e3d1c5 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -34,7 +34,7 @@ jobs: - name: Run golang tests with coverage run: | mkdir ./coverage - go test -v -race -cover -coverpkg=./... -coverprofile=./coverage/pq-${{ matrix.os }}.out -covermode=atomic ./... + go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq-${{ matrix.os }}.out -covermode=atomic ./... - name: Archive code coverage results uses: actions/upload-artifact@v6 @@ -50,17 +50,23 @@ jobs: timeout-minutes: 60 steps: + - name: Check out code + uses: actions/checkout@v6 + - name: Download code coverage results uses: actions/download-artifact@v7 - - run: | - cd pq-ubuntu-latest + with: + name: pq-ubuntu-latest + path: ./coverage + + - name: Prepare coverage report + run: | + cd ./coverage echo 'mode: atomic' > summary.txt - tail -q -n +2 *.out >> summary.txt - sed -i '2,${/roadrunner/!d}' summary.txt + tail -q -n +2 *.out | awk '/github\.com\/roadrunner-server\/priority_queue/' >> summary.txt - - name: upload to codecov + - name: Upload coverage to Codecov uses: codecov/codecov-action@v5 # Docs: with: - token: ${{ secrets.CODECOV_TOKEN }} files: ./coverage/summary.txt fail_ci_if_error: false diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..d57cc68 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,12 @@ +coverage: + status: + project: + default: + target: auto + threshold: 50% + informational: true + patch: + default: + target: auto + threshold: 50% + informational: true From e4e1f840128e6c6f740f6bad85c693ba402b7730 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 16:13:37 +0000 Subject: [PATCH 07/10] chore(deps): bump actions/download-artifact from 7 to 8 Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 7 to 8. - [Release notes](https://github.com/actions/download-artifact/releases) - [Commits](https://github.com/actions/download-artifact/compare/v7...v8) --- updated-dependencies: - dependency-name: actions/download-artifact dependency-version: '8' dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/linux.yml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 0e3d1c5..e592326 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -54,10 +54,12 @@ jobs: uses: actions/checkout@v6 - name: Download code coverage results - uses: actions/download-artifact@v7 - with: - name: pq-ubuntu-latest - path: ./coverage + uses: actions/download-artifact@v8 + - run: | + cd pq-ubuntu-latest + echo 'mode: atomic' > summary.txt + tail -q -n +2 *.out >> summary.txt + sed -i '2,${/roadrunner/!d}' summary.txt - name: Prepare coverage report run: | From 6ab603228870899afeb51eb539a862faf332afae Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 27 Feb 2026 16:13:41 +0000 Subject: [PATCH 08/10] chore(deps): bump actions/upload-artifact from 6 to 7 Bumps [actions/upload-artifact](https://github.com/actions/upload-artifact) from 6 to 7. - [Release notes](https://github.com/actions/upload-artifact/releases) - [Commits](https://github.com/actions/upload-artifact/compare/v6...v7) --- updated-dependencies: - dependency-name: actions/upload-artifact dependency-version: '7' dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/linux.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index e592326..9c52635 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -37,7 +37,7 @@ jobs: go test -v -race -cover -tags=debug -coverpkg=./... -coverprofile=./coverage/pq-${{ matrix.os }}.out -covermode=atomic ./... - name: Archive code coverage results - uses: actions/upload-artifact@v6 + uses: actions/upload-artifact@v7 with: name: pq-${{ matrix.os }} path: ./coverage/pq-${{ matrix.os }}.out From 251738bc5da6a37e424c69b2f33662e6cb7eb51b Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 8 Mar 2026 13:09:15 +0100 Subject: [PATCH 09/10] fix: codecov ci Signed-off-by: Valery Piashchynski --- .github/workflows/linux.yml | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 9c52635..cb833fe 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -55,20 +55,25 @@ jobs: - name: Download code coverage results uses: actions/download-artifact@v8 - - run: | - cd pq-ubuntu-latest - echo 'mode: atomic' > summary.txt - tail -q -n +2 *.out >> summary.txt - sed -i '2,${/roadrunner/!d}' summary.txt + with: + name: pq-ubuntu-latest + path: coverage - name: Prepare coverage report run: | - cd ./coverage echo 'mode: atomic' > summary.txt - tail -q -n +2 *.out | awk '/github\.com\/roadrunner-server\/priority_queue/' >> summary.txt + tail -q -n +2 coverage/*.out >> summary.txt + awk ' + NR == 1 { print; next } + /^github\.com\/roadrunner-server\/priority_queue\// { + sub(/^github\.com\/roadrunner-server\/priority_queue\//, "", $0) + print + } + ' summary.txt > summary.filtered.txt + mv summary.filtered.txt summary.txt - name: Upload coverage to Codecov - uses: codecov/codecov-action@v5 # Docs: + uses: codecov/codecov-action@v5 with: - files: ./coverage/summary.txt + files: summary.txt fail_ci_if_error: false From 48f6cbc3d146f1b2e585c66f8600e69b7cc2e0b4 Mon Sep 17 00:00:00 2001 From: Valery Piashchynski Date: Sun, 8 Mar 2026 13:19:37 +0100 Subject: [PATCH 10/10] fix: clear removed items Signed-off-by: Valery Piashchynski --- binary_heap.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/binary_heap.go b/binary_heap.go index bfd7a05..83cb65d 100644 --- a/binary_heap.go +++ b/binary_heap.go @@ -101,6 +101,8 @@ func (bh *BinHeap[T]) Remove(groupID string) []T { } } + oldLen := len(bh.items) + ids := bh.st.Indices() adjustment := 0 for i := range ids { @@ -111,6 +113,9 @@ func (bh *BinHeap[T]) Remove(groupID string) []T { adjustment += end - start + 1 } + // Zero freed tail slots to allow GC of removed items + clear(bh.items[len(bh.items):oldLen]) + // re-heapify after compaction (Floyd's algorithm) n := len(bh.items) for i := n/2 - 1; i >= 0; i-- { @@ -173,6 +178,8 @@ func (bh *BinHeap[T]) ExtractMin() T { bh.swap(0, n-1) item := bh.items[n-1] + var zero T + bh.items[n-1] = zero bh.items = bh.items[:n-1] bh.fixDown(0, int(n)-2) //nolint:gosec