Skip to content

Commit

Permalink
chore: applying 1.6.3 hotfixes to main branch (#3061)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Mar 1, 2023
1 parent f936260 commit ade2e4d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 6 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [1.6.3](https://github.com/rudderlabs/rudder-server/compare/v1.6.2...v1.6.3) (2023-03-01)


### Bug Fixes

* limiter deadlock while trying to notify a dynamic priority item ([#3056](https://github.com/rudderlabs/rudder-server/issues/3056)) ([5f967dc](https://github.com/rudderlabs/rudder-server/commit/5f967dc77a2e14338e8a7a79e60dd705cb2bc213))

## [1.6.2](https://github.com/rudderlabs/rudder-server/compare/v1.6.1...v1.6.2) (2023-02-28)


Expand Down
3 changes: 3 additions & 0 deletions utils/queue/priorityQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (pq *PriorityQueue[T]) GetIndex(x interface{}) int {

// Update updates the attributes of an element in the priority queue.
func (pq *PriorityQueue[T]) Update(item *Item[T], priority int) {
if item.index == -1 {
return
}
item.Priority = priority
heap.Fix(pq, item.index)
}
Expand Down
15 changes: 15 additions & 0 deletions utils/queue/priorityQueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,19 @@ func TestPriorityQueue(t *testing.T) {
require.Nil(t, pq.Pop())
require.Equal(t, 0, pq.Len())
})

t.Run("pop then try to update", func(t *testing.T) {
pq := make(PriorityQueue[any], 3)

for i := 0; i < 3; i++ {
pq[i] = &Item[any]{
Priority: 1,
timeStamp: int64(i),
}
}
i1 := pq.Pop().((*Item[any])) // remove the item
require.Len(t, pq, 2, "pq should have 2 elements after pop")
pq.Update(i1, i1.Priority+1) // try to update the removed item
require.Len(t, pq, 2, "pq should still have 2 elements after updating the popped item")
})
}
15 changes: 9 additions & 6 deletions utils/sync/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ func NewLimiter(ctx context.Context, wg *sync.WaitGroup, name string, limit int,
type limiter struct {
name string
limit int
count int
mu sync.Mutex
waitList queue.PriorityQueue[chan struct{}]
dynamicPeriod time.Duration
stats struct {

mu sync.Mutex // protects count and waitList below
count int
waitList queue.PriorityQueue[chan struct{}]

stats struct {
triggerFunc func() <-chan time.Time
stat stats.Stats
waitGauge stats.Measurement // gauge showing number of operations waiting in the queue
Expand Down Expand Up @@ -136,14 +138,15 @@ func (l *limiter) BeginWithPriority(key string, priority LimiterPriorityValue) (
end = func() {
defer l.stats.stat.NewTaggedStat(l.name+"_limiter_working", stats.TimerType, stats.Tags{"key": key}).Since(start)
l.mu.Lock()
defer l.mu.Unlock()
l.count--
if len(l.waitList) == 0 {
l.mu.Unlock()
return
}
next := heap.Pop(&l.waitList).(*queue.Item[chan struct{}])
next.Value <- struct{}{}
l.count++
l.mu.Unlock()
next.Value <- struct{}{}
close(next.Value)
}
return end
Expand Down

0 comments on commit ade2e4d

Please sign in to comment.