Skip to content

Commit

Permalink
planner: refactor to make tableStatsDelta thread-safe (#46977)
Browse files Browse the repository at this point in the history
ref #46905
  • Loading branch information
qw4990 committed Sep 14, 2023
1 parent 32d0d62 commit 322104f
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 41 deletions.
21 changes: 8 additions & 13 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,11 @@ type Handle struct {
// written only after acquiring the lock.
statsCache *cache.StatsCachePointer

// globalMap contains all the delta map from collectors when we dump them to KV.
globalMap struct {
data tableDeltaMap
sync.Mutex
}
// tableDelta contains all the delta map from collectors when we dump them to KV.
tableDelta *tableDelta

// colMap contains all the column stats usage information from collectors when we dump them to KV.
colMap *statsUsage
// statsUsage contains all the column stats usage information from collectors when we dump them to KV.
statsUsage *statsUsage

// StatsLoad is used to load stats concurrently
StatsLoad StatsLoad
Expand Down Expand Up @@ -182,10 +179,8 @@ func (h *Handle) Clear() {
h.mu.ctx.GetSessionVars().EnableChunkRPC = false
h.mu.ctx.GetSessionVars().SetProjectionConcurrency(0)
h.listHead.ClearForTest()
h.globalMap.Lock()
h.globalMap.data = make(tableDeltaMap)
h.globalMap.Unlock()
h.colMap.reset()
h.tableDelta.reset()
h.statsUsage.reset()
h.mu.Unlock()
}

Expand Down Expand Up @@ -215,8 +210,8 @@ func NewHandle(ctx, initStatsCtx sessionctx.Context, lease time.Duration, pool s
return nil, err
}
handle.statsCache = statsCache
handle.globalMap.data = make(tableDeltaMap)
handle.colMap = newStatsUsage()
handle.tableDelta = newTableDelta()
handle.statsUsage = newStatsUsage()
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
Expand Down
82 changes: 55 additions & 27 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,40 @@ import (
"go.uber.org/zap"
)

type tableDeltaMap map[int64]variable.TableDelta
// tableDelta is used to collect tables' change information.
// All methods of it are thread-safe.
type tableDelta struct {
delta map[int64]variable.TableDelta // map[tableID]delta
lock sync.Mutex
}

func newTableDelta() *tableDelta {
return &tableDelta{
delta: make(map[int64]variable.TableDelta),
}
}

func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[int64]int64) {
func (m *tableDelta) reset() {
m.lock.Lock()
defer m.lock.Unlock()
m.delta = make(map[int64]variable.TableDelta)
}

func (m *tableDelta) getDeltaAndReset() map[int64]variable.TableDelta {
m.lock.Lock()
defer m.lock.Unlock()
ret := m.delta
m.delta = make(map[int64]variable.TableDelta)
return ret
}

func (m *tableDelta) update(id int64, delta int64, count int64, colSize *map[int64]int64) {
m.lock.Lock()
defer m.lock.Unlock()
updateTableDeltaMap(m.delta, id, delta, count, colSize)
}

func updateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) {
item := m[id]
item.Delta += delta
item.Count += count
Expand All @@ -59,13 +90,19 @@ func (m tableDeltaMap) update(id int64, delta int64, count int64, colSize *map[i
m[id] = item
}

func (m tableDeltaMap) merge(deltaMap tableDeltaMap) {
func (m *tableDelta) merge(deltaMap map[int64]variable.TableDelta) {
if len(deltaMap) == 0 {
return
}
m.lock.Lock()
defer m.lock.Unlock()
for id, item := range deltaMap {
m.update(id, item.Delta, item.Count, &item.ColSize)
updateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize)
}
}

// statsUsage maps (tableID, columnID) to the last time when the column stats are used(needed).
// All methods of it are thread-safe.
type statsUsage struct {
usage map[model.TableItemID]time.Time
lock sync.RWMutex
Expand Down Expand Up @@ -104,15 +141,14 @@ func (m *statsUsage) merge(other map[model.TableItemID]time.Time) {
}
}

func merge(s *SessionStatsCollector, deltaMap tableDeltaMap, colMap *statsUsage) {
deltaMap.merge(s.mapper)
s.mapper = make(tableDeltaMap)
func merge(s *SessionStatsCollector, deltaMap *tableDelta, colMap *statsUsage) {
deltaMap.merge(s.mapper.getDeltaAndReset())
colMap.merge(s.statsUsage.getUsageAndReset())
}

// SessionStatsCollector is a list item that holds the delta mapper. If you want to write or read mapper, you must lock it.
type SessionStatsCollector struct {
mapper tableDeltaMap
mapper *tableDelta
statsUsage *statsUsage
next *SessionStatsCollector
sync.Mutex
Expand All @@ -124,7 +160,7 @@ type SessionStatsCollector struct {
// NewSessionStatsCollector initializes a new SessionStatsCollector.
func NewSessionStatsCollector() *SessionStatsCollector {
return &SessionStatsCollector{
mapper: make(tableDeltaMap),
mapper: newTableDelta(),
statsUsage: newStatsUsage(),
}
}
Expand All @@ -147,7 +183,7 @@ func (s *SessionStatsCollector) Update(id int64, delta int64, count int64, colSi
func (s *SessionStatsCollector) ClearForTest() {
s.Lock()
defer s.Unlock()
s.mapper = make(tableDeltaMap)
s.mapper = newTableDelta()
s.statsUsage = newStatsUsage()
s.next = nil
s.deleted = false
Expand All @@ -165,7 +201,7 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
h.listHead.Lock()
defer h.listHead.Unlock()
newCollector := &SessionStatsCollector{
mapper: make(tableDeltaMap),
mapper: newTableDelta(),
next: h.listHead.next,
statsUsage: newStatsUsage(),
}
Expand Down Expand Up @@ -376,7 +412,7 @@ const (
// sweepList will loop over the list, merge each session's local stats into handle
// and remove closed session's collector.
func (h *Handle) sweepList() {
deltaMap := make(tableDeltaMap)
deltaMap := newTableDelta()
colMap := newStatsUsage()
prev := h.listHead
prev.Lock()
Expand All @@ -395,25 +431,17 @@ func (h *Handle) sweepList() {
}
}
prev.Unlock()
h.globalMap.Lock()
h.globalMap.data.merge(deltaMap)
h.globalMap.Unlock()
h.colMap.merge(colMap.getUsageAndReset())
h.tableDelta.merge(deltaMap.getDeltaAndReset())
h.statsUsage.merge(colMap.getUsageAndReset())
}

// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
// If the mode is `DumpDelta`, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio.
func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
h.sweepList()
h.globalMap.Lock()
deltaMap := h.globalMap.data
h.globalMap.data = make(tableDeltaMap)
h.globalMap.Unlock()
deltaMap := h.tableDelta.getDeltaAndReset()
defer func() {
h.globalMap.Lock()
deltaMap.merge(h.globalMap.data)
h.globalMap.data = deltaMap
h.globalMap.Unlock()
h.tableDelta.merge(deltaMap)
}()
// TODO: pass in do.InfoSchema() to DumpStatsDeltaToKV.
is := func() infoschema.InfoSchema {
Expand All @@ -431,7 +459,7 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error {
return errors.Trace(err)
}
if updated {
deltaMap.update(id, -item.Delta, -item.Count, nil)
updateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil)
}
if err = h.dumpTableStatColSizeToKV(id, item); err != nil {
delete(deltaMap, id)
Expand Down Expand Up @@ -551,9 +579,9 @@ func (h *Handle) DumpColStatsUsageToKV() error {
return nil
}
h.sweepList()
colMap := h.colMap.getUsageAndReset()
colMap := h.statsUsage.getUsageAndReset()
defer func() {
h.colMap.merge(colMap)
h.statsUsage.merge(colMap)
}()
type pair struct {
lastUsedAt string
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/update_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

func TestInsertAndDelete(t *testing.T) {
h := Handle{
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap)},
listHead: &SessionStatsCollector{mapper: newTableDelta()},
}
var items []*SessionStatsCollector
for i := 0; i < 5; i++ {
Expand Down

0 comments on commit 322104f

Please sign in to comment.