Skip to content

Commit

Permalink
fix block
Browse files Browse the repository at this point in the history
Signed-off-by: lance6716 <lance6716@gmail.com>
  • Loading branch information
lance6716 committed Apr 12, 2023
1 parent 8836a02 commit 45b208d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 32 deletions.
78 changes: 48 additions & 30 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,16 +710,22 @@ func (h *regionJobRetryHeap) Pop() any {
// back later, and put back when the regionJob.waitUntil is reached. It maintains
// a heap of jobs internally based on the regionJob.waitUntil field.
type regionJobRetryer struct {
protected struct {
// lock acquiring order: protectedClosed > protectedQueue > protectedToPutBack
protectedClosed struct {
mu sync.Mutex
closed bool
}
protectedQueue struct {
mu sync.Mutex
q regionJobRetryHeap
}
protectedToPutBack struct {
mu sync.Mutex
q regionJobRetryHeap
toPutBack *regionJob
}
putBackCh chan<- *regionJob
reload chan struct{}
done chan struct{}
closed bool
closeMu sync.Mutex
}

func newRegionJobRetryer(putBackCh chan<- *regionJob) *regionJobRetryer {
Expand All @@ -728,18 +734,18 @@ func newRegionJobRetryer(putBackCh chan<- *regionJob) *regionJobRetryer {
reload: make(chan struct{}, 1),
done: make(chan struct{}),
}
ret.protected.q = make(regionJobRetryHeap, 0, 16)
ret.protectedQueue.q = make(regionJobRetryHeap, 0, 16)
return ret
}

func (q *regionJobRetryer) run(ctx context.Context) {
for {
var front *regionJob
q.protected.mu.Lock()
if len(q.protected.q) > 0 {
front = q.protected.q[0]
q.protectedQueue.mu.Lock()
if len(q.protectedQueue.q) > 0 {
front = q.protectedQueue.q[0]
}
q.protected.mu.Unlock()
q.protectedQueue.mu.Unlock()

switch {
case front != nil:
Expand All @@ -750,18 +756,24 @@ func (q *regionJobRetryer) run(ctx context.Context) {
return
case <-q.reload:
case <-time.After(time.Until(front.waitUntil)):
q.protected.mu.Lock()
q.protected.toPutBack = heap.Pop(&q.protected.q).(*regionJob)
q.protectedQueue.mu.Lock()
q.protectedToPutBack.mu.Lock()
q.protectedToPutBack.toPutBack = heap.Pop(&q.protectedQueue.q).(*regionJob)
// release the lock of queue to avoid blocking regionJobRetryer.push
q.protectedQueue.mu.Unlock()

// hold the lock of toPutBack to make sending to putBackCh and
// resetting toPutBack atomic w.r.t. regionJobRetryer.close
select {
case <-ctx.Done():
q.protected.mu.Unlock()
q.protectedToPutBack.mu.Unlock()
return
case <-q.done:
q.protected.mu.Unlock()
q.protectedToPutBack.mu.Unlock()
return
case q.putBackCh <- q.protected.toPutBack:
q.protected.toPutBack = nil
q.protected.mu.Unlock()
case q.putBackCh <- q.protectedToPutBack.toPutBack:
q.protectedToPutBack.toPutBack = nil
q.protectedToPutBack.mu.Unlock()
}
}
default:
Expand All @@ -777,16 +789,17 @@ func (q *regionJobRetryer) run(ctx context.Context) {
}
}

// push should not be blocked for long time in any cases.
func (q *regionJobRetryer) push(job *regionJob) bool {
q.closeMu.Lock()
defer q.closeMu.Unlock()
if q.closed {
q.protectedClosed.mu.Lock()
defer q.protectedClosed.mu.Unlock()
if q.protectedClosed.closed {
return false
}

q.protected.mu.Lock()
heap.Push(&q.protected.q, job)
q.protected.mu.Unlock()
q.protectedQueue.mu.Lock()
heap.Push(&q.protectedQueue.q, job)
q.protectedQueue.mu.Unlock()

select {
case q.reload <- struct{}{}:
Expand All @@ -796,20 +809,25 @@ func (q *regionJobRetryer) push(job *regionJob) bool {
}

// close will return the number of jobs that are not put back when first called.
// close MUST be called when caller is sure that regionJobRetryer holds no job
// or regionJobRetryer.run has exited.
func (q *regionJobRetryer) close() int {
q.closeMu.Lock()
defer q.closeMu.Unlock()
if q.closed {
q.protectedClosed.mu.Lock()
defer q.protectedClosed.mu.Unlock()
if q.protectedClosed.closed {
return 0
}
q.closed = true
q.protectedClosed.closed = true

close(q.done)

q.protected.mu.Lock()
defer q.protected.mu.Unlock()
ret := len(q.protected.q)
if q.protected.toPutBack != nil {
// don't need to lock them in fact because of the requirement of calling close
q.protectedQueue.mu.Lock()
defer q.protectedQueue.mu.Unlock()
q.protectedToPutBack.mu.Lock()
defer q.protectedToPutBack.mu.Unlock()
ret := len(q.protectedQueue.q)
if q.protectedToPutBack.toPutBack != nil {
ret++
}
return ret
Expand Down
14 changes: 12 additions & 2 deletions br/pkg/lightning/backend/local/region_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ func TestRegionJobRetryer(t *testing.T) {
ok = retryer.push(job)
require.False(t, ok)

// test when putBackCh is blocked, retryer.close is correct
// test when putBackCh is blocked, retryer.push is not blocked and
// the return value of retryer.close is correct

done = make(chan struct{})
putBackCh = make(chan *regionJob)
Expand All @@ -247,7 +248,16 @@ func TestRegionJobRetryer(t *testing.T) {
ok = retryer.push(job)
require.True(t, ok)
time.Sleep(3 * time.Second)
// now retryer is sending to putBackCh, but putBackCh is blocked
job = &regionJob{
keyRange: Range{
start: []byte("456"),
},
waitUntil: time.Now().Add(-time.Second),
}
ok = retryer.push(job)
require.True(t, ok)
remainCnt = retryer.close()
require.Equal(t, 1, remainCnt)
require.Equal(t, 2, remainCnt)
<-done
}

0 comments on commit 45b208d

Please sign in to comment.