Skip to content

Commit

Permalink
util: fix memory.reArrangeFallback cpu usage (pingcap#30414) (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot committed Apr 26, 2022
1 parent 9dd93df commit 20de896
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 12 deletions.
12 changes: 2 additions & 10 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8206,16 +8206,8 @@ func (s *testSuite) TestOOMActionPriority(c *C) {
tk.MustExec("create table t4(a int)")
tk.MustExec("insert into t4 values(1)")
tk.MustQuery("select * from t0 join t1 join t2 join t3 join t4 order by t0.a").Check(testkit.Rows("1 1 1 1 1"))
action := tk.Se.GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest()
// check the first 5 actions is rate limit.
for i := 0; i < 5; i++ {
c.Assert(action.GetPriority(), Equals, int64(memory.DefRateLimitPriority))
action = action.GetFallback()
}
for action.GetFallback() != nil {
c.Assert(action.GetPriority(), Equals, int64(memory.DefSpillPriority))
action = action.GetFallback()
}
action := tk.Se.GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest(true)
// All actions are finished and removed.
c.Assert(action.GetPriority(), Equals, int64(memory.DefLogPriority))
}

Expand Down
3 changes: 3 additions & 0 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func (e *SortExec) Close() error {
e.memTracker = nil
e.diskTracker = nil
e.multiWayMerge = nil
if e.spillAction != nil {
e.spillAction.SetFinished()
}
e.spillAction = nil
return e.children[0].Close()
}
Expand Down
1 change: 1 addition & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,7 @@ func (e *rateLimitAction) close() {
e.conditionLock()
defer e.conditionUnlock()
e.cond.exceeded = false
e.SetFinished()
}

func (e *rateLimitAction) setEnabled(enabled bool) {
Expand Down
1 change: 1 addition & 0 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (c *RowContainer) Close() (err error) {
// Set status to spilledYet to avoid spilling.
c.actionSpill.setStatus(spilledYet)
c.actionSpill.cond.Broadcast()
c.actionSpill.SetFinished()
}
if c.alreadySpilled() {
err = c.m.records.inDisk.Close()
Expand Down
21 changes: 20 additions & 1 deletion util/memory/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package memory
import (
"fmt"
"sync"
"sync/atomic"

"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/util/dbterror"
Expand All @@ -40,11 +41,16 @@ type ActionOnExceed interface {
GetFallback() ActionOnExceed
// GetPriority get the priority of the Action.
GetPriority() int64
// SetFinished sets the finished state of the Action.
SetFinished()
// IsFinished returns the finished state of the Action.
IsFinished() bool
}

// BaseOOMAction manages the fallback action for all Action.
type BaseOOMAction struct {
fallbackAction ActionOnExceed
finished int32
}

// SetFallback sets a fallback action which will be triggered if itself has
Expand All @@ -53,8 +59,21 @@ func (b *BaseOOMAction) SetFallback(a ActionOnExceed) {
b.fallbackAction = a
}

// GetFallback get the fallback action of the Action.
// SetFinished sets the finished state of the Action.
func (b *BaseOOMAction) SetFinished() {
atomic.StoreInt32(&b.finished, 1)
}

// IsFinished returns the finished state of the Action.
func (b *BaseOOMAction) IsFinished() bool {
return atomic.LoadInt32(&b.finished) == 1
}

// GetFallback get the fallback action and remove finished fallback.
func (b *BaseOOMAction) GetFallback() ActionOnExceed {
for b.fallbackAction != nil && b.fallbackAction.IsFinished() {
b.SetFallback(b.fallbackAction.GetFallback())
}
return b.fallbackAction
}

Expand Down
8 changes: 7 additions & 1 deletion util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,12 @@ func (t *Tracker) FallbackOldAndSetNewActionForSoftLimit(a ActionOnExceed) {
}

// GetFallbackForTest get the oom action used by test.
func (t *Tracker) GetFallbackForTest() ActionOnExceed {
func (t *Tracker) GetFallbackForTest(ignoreFinishedAction bool) ActionOnExceed {
t.actionMuForHardLimit.Lock()
defer t.actionMuForHardLimit.Unlock()
if t.actionMuForHardLimit.actionOnExceed != nil && t.actionMuForHardLimit.actionOnExceed.IsFinished() && ignoreFinishedAction {
t.actionMuForHardLimit.actionOnExceed = t.actionMuForHardLimit.actionOnExceed.GetFallback()
}
return t.actionMuForHardLimit.actionOnExceed
}

Expand Down Expand Up @@ -332,6 +335,9 @@ func (t *Tracker) Consume(bytes int64) {
tryAction := func(mu *actionMu, tracker *Tracker) {
mu.Lock()
defer mu.Unlock()
for mu.actionOnExceed != nil && mu.actionOnExceed.IsFinished() {
mu.actionOnExceed = mu.actionOnExceed.GetFallback()
}
if mu.actionOnExceed != nil {
mu.actionOnExceed.Action(tracker)
}
Expand Down
19 changes: 19 additions & 0 deletions util/memory/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,25 @@ func TestOOMAction(t *testing.T) {
require.True(t, action1.called)
require.True(t, action2.called) // SoftLimit fallback
require.True(t, action3.called) // HardLimit

// test fallback
action1 = &mockAction{}
action2 = &mockAction{}
action3 = &mockAction{}
action4 := &mockAction{}
action5 := &mockAction{}
tracker.SetActionOnExceed(action1)
tracker.FallbackOldAndSetNewAction(action2)
tracker.FallbackOldAndSetNewAction(action3)
tracker.FallbackOldAndSetNewAction(action4)
tracker.FallbackOldAndSetNewAction(action5)
require.Equal(t, action1, tracker.actionMuForHardLimit.actionOnExceed)
require.Equal(t, action2, tracker.actionMuForHardLimit.actionOnExceed.GetFallback())
action2.SetFinished()
require.Equal(t, action3, tracker.actionMuForHardLimit.actionOnExceed.GetFallback())
action3.SetFinished()
action4.SetFinished()
require.Equal(t, action5, tracker.actionMuForHardLimit.actionOnExceed.GetFallback())
}

type mockAction struct {
Expand Down

0 comments on commit 20de896

Please sign in to comment.