Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: simplify StatsCacheInner #45256

Merged
merged 8 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions statistics/handle/cache/internal/inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,10 @@ type StatsCacheInner interface {
Values() []*statistics.Table
// Len returns the length of the cache.
Len() int
FreshMemUsage()
// Copy returns a copy of the cache
Copy() StatsCacheInner
// SetCapacity sets the capacity of the cache
SetCapacity(int64)
// Version returns the version of the current cache, which is defined as
// the max table stats version the cache has in its lifecycle.
Version() (maxTableStatsVersion uint64)

// Front returns the front element's owner tableID, only used for test
// TODO: this method is mainly for test, remove it in the future.
Expand Down
21 changes: 0 additions & 21 deletions statistics/handle/cache/internal/lru/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type StatsInnerCache struct {
// lru maintains item lru cache
lru *innerItemLruCache
sync.RWMutex
maxTableStatsVersion uint64
}

// NewStatsLruCache creates a new LRU cache for statistics.
Expand Down Expand Up @@ -129,9 +128,6 @@ func (s *StatsInnerCache) Put(tblID int64, tbl *statistics.Table) {
}

func (s *StatsInnerCache) put(tblID int64, tbl *statistics.Table, tblMemUsage *statistics.TableMemoryUsage, needMove bool) {
if tbl.Version > s.maxTableStatsVersion {
s.maxTableStatsVersion = tbl.Version
}
element, exist := s.elements[tblID]
if exist {
s.updateColumns(tblID, tbl, tblMemUsage, needMove)
Expand Down Expand Up @@ -270,15 +266,6 @@ func (s *StatsInnerCache) Len() int {
return len(s.elements)
}

// FreshMemUsage implements statsCacheInner
func (s *StatsInnerCache) FreshMemUsage() {
s.Lock()
defer s.Unlock()
for tblID, element := range s.elements {
s.freshTableCost(tblID, element)
}
}

// Copy implements statsCacheInner
func (s *StatsInnerCache) Copy() internal.StatsCacheInner {
s.RLock()
Expand All @@ -289,17 +276,9 @@ func (s *StatsInnerCache) Copy() internal.StatsCacheInner {
newCache.elements[tblID] = element.copy()
}
newCache.lru.onEvict = newCache.onEvict
newCache.maxTableStatsVersion = s.maxTableStatsVersion
return newCache
}

// Version implements statsCacheInner
func (s *StatsInnerCache) Version() uint64 {
s.RLock()
defer s.RUnlock()
return s.maxTableStatsVersion
}

// SetCapacity implements statsCacheInner
func (s *StatsInnerCache) SetCapacity(c int64) {
s.Lock()
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/cache/internal/lru/lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,10 @@ func TestLRUFreshMemUsage(t *testing.T) {
lru.Put(int64(3), t3)
require.Equal(t, lru.Cost(), 6*mockCMSMemoryUsage+6*mockCMSMemoryUsage)
mockTableAppendColumn(t1)
lru.FreshMemUsage()
lru.Put(int64(1), t1)
require.Equal(t, lru.Cost(), 6*mockCMSMemoryUsage+7*mockCMSMemoryUsage)
mockTableAppendIndex(t1)
lru.FreshMemUsage()
lru.Put(int64(1), t1)
require.Equal(t, lru.Cost(), 7*mockCMSMemoryUsage+7*mockCMSMemoryUsage)

mockTableRemoveColumn(t1)
Expand Down
27 changes: 4 additions & 23 deletions statistics/handle/cache/internal/mapcache/map_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ func (c cacheItem) copy() cacheItem {

// MapCache is a cache based on map.
type MapCache struct {
tables map[int64]cacheItem
memUsage int64
maxTableStatsVersion uint64
tables map[int64]cacheItem
memUsage int64
}

// NewMapCache creates a new map cache.
Expand Down Expand Up @@ -66,9 +65,6 @@ func (m *MapCache) PutByQuery(k int64, v *statistics.Table) {

// Put implements StatsCacheInner
func (m *MapCache) Put(k int64, v *statistics.Table) {
if v.Version > m.maxTableStatsVersion {
m.maxTableStatsVersion = v.Version
}
item, ok := m.tables[k]
if ok {
oldCost := item.cost
Expand Down Expand Up @@ -136,21 +132,11 @@ func (m *MapCache) Len() int {
return len(m.tables)
}

// FreshMemUsage implements StatsCacheInner
func (m *MapCache) FreshMemUsage() {
for _, v := range m.tables {
oldCost := v.cost
newCost := v.value.MemoryUsage().TotalMemUsage
m.memUsage += newCost - oldCost
}
}

// Copy implements StatsCacheInner
func (m *MapCache) Copy() internal.StatsCacheInner {
newM := &MapCache{
tables: make(map[int64]cacheItem, len(m.tables)),
memUsage: m.memUsage,
maxTableStatsVersion: m.maxTableStatsVersion,
tables: make(map[int64]cacheItem, len(m.tables)),
memUsage: m.memUsage,
}
for k, v := range m.tables {
newM.tables[k] = v.copy()
Expand All @@ -165,8 +151,3 @@ func (*MapCache) SetCapacity(int64) {}
func (*MapCache) Front() int64 {
return 0
}

// Version implements StatsCacheInner
func (m *MapCache) Version() uint64 {
return m.maxTableStatsVersion
}
53 changes: 36 additions & 17 deletions statistics/handle/cache/statscacheinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -72,6 +73,8 @@ func NewStatsCache() *StatsCache {
// StatsCache caches the tables in memory for Handle.
type StatsCache struct {
c internal.StatsCacheInner
// the max table stats version the cache has in its lifecycle.
maxTblStatsVer atomic.Uint64
}

// Len returns the number of tables in the cache.
Expand All @@ -80,6 +83,9 @@ func (sc *StatsCache) Len() int {
}

// Get returns the statistics of the specified Table ID.
// The returned value should be read-only, if you update it, don't forget to use Put to put it back again, otherwise the memory trace can be inaccurate.
//
// e.g. v := sc.Get(id); /* update the value */ v.Version = 123; sc.Put(id, v);
func (sc *StatsCache) Get(id int64) (*statistics.Table, bool) {
return sc.c.Get(id)
}
Expand All @@ -93,6 +99,13 @@ func (sc *StatsCache) GetByQuery(id int64) (*statistics.Table, bool) {
// Put puts the table statistics to the cache.
func (sc *StatsCache) Put(id int64, t *statistics.Table) {
sc.c.Put(id, t)

// update the maxTblStatsVer
for v := sc.maxTblStatsVer.Load(); v < t.Version; v = sc.maxTblStatsVer.Load() {
if sc.maxTblStatsVer.CompareAndSwap(v, t.Version) {
break
} // other goroutines have updated the sc.maxTblStatsVer, so we need to check again.
}
}

// Values returns all the cached statistics tables.
Expand All @@ -101,8 +114,13 @@ func (sc *StatsCache) Values() []*statistics.Table {
}

// FreshMemUsage refreshes the memory usage of the cache.
// Values in StatsCache should be read-only, but when initializing the cache, some values can be modified.
// To make the memory cost more accurate, we need to refresh the memory usage of the cache after finishing the initialization.
func (sc *StatsCache) FreshMemUsage() {
sc.c.FreshMemUsage()
values := sc.c.Values()
for _, v := range values {
sc.c.Put(v.PhysicalID, v)
chrysan marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Cost returns the memory usage of the cache.
Expand All @@ -118,40 +136,41 @@ func (sc *StatsCache) SetCapacity(c int64) {
// Version returns the version of the current cache, which is defined as
// the max table stats version the cache has in its lifecycle.
func (sc *StatsCache) Version() uint64 {
return sc.c.Version()
return sc.maxTblStatsVer.Load()
}

// Front returns the front element's owner tableID, only used for test.
func (sc *StatsCache) Front() int64 {
return sc.c.Front()
}

// Update updates the statistics table cache using Copy on write.
func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64, opts ...TableStatsOpt) *StatsCache {
// CopyAndUpdate copies a new cache and updates the new statistics table cache.
func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int64, opts ...TableStatsOpt) *StatsCache {
option := &TableStatsOption{}
for _, opt := range opts {
opt(option)
}
newCache := &StatsCache{}
newCache.c = CopyAndUpdateStatsCache(sc.c, tables, deletedIDs, option.byQuery)
return newCache
}

// CopyAndUpdateStatsCache copies the base cache and applies some updates to the new copied cache, the base cache should keep unchanged.
func CopyAndUpdateStatsCache(base internal.StatsCacheInner, tables []*statistics.Table, deletedIDs []int64, byQuery bool) internal.StatsCacheInner {
c := base.Copy()
newCache := &StatsCache{c: sc.c.Copy()}
newCache.maxTblStatsVer.Store(sc.maxTblStatsVer.Load())
for _, tbl := range tables {
id := tbl.PhysicalID
if byQuery {
c.PutByQuery(id, tbl)
if option.byQuery {
newCache.c.PutByQuery(id, tbl)
} else {
c.Put(id, tbl)
newCache.c.Put(id, tbl)
}
}
for _, id := range deletedIDs {
c.Del(id)
newCache.c.Del(id)
}
return c

// update the maxTblStatsVer
for _, t := range tables {
if t.Version > newCache.maxTblStatsVer.Load() {
newCache.maxTblStatsVer.Store(t.Version)
}
}
return newCache
}

// TableRowStatsCache is the cache of table row count.
Expand Down
12 changes: 6 additions & 6 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func (h *Handle) Update(is infoschema.InfoSchema, opts ...cache.TableStatsOpt) e
tbl.TblInfoUpdateTS = tableInfo.UpdateTS
tables = append(tables, tbl)
}
h.updateStatsCache(oldCache.Update(tables, deletedTableIDs, opts...))
h.updateStatsCache(oldCache.CopyAndUpdate(tables, deletedTableIDs, opts...))
return nil
}

Expand Down Expand Up @@ -996,7 +996,7 @@ func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64, opts ...
tbl = statistics.PseudoTable(tblInfo)
tbl.PhysicalID = pid
if tblInfo.GetPartitionInfo() == nil || h.statsCacheLen() < 64 {
h.updateStatsCache(statsCache.Update([]*statistics.Table{tbl}, nil))
h.updateStatsCache(statsCache.CopyAndUpdate([]*statistics.Table{tbl}, nil))
}
return tbl
}
Expand Down Expand Up @@ -1102,7 +1102,7 @@ func (h *Handle) loadNeededColumnHistograms(reader *statistics.StatsReader, col
}
tbl = tbl.Copy()
tbl.Columns[c.ID] = colHist
if h.updateStatsCache(oldCache.Update([]*statistics.Table{tbl}, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{tbl}, nil)) {
statistics.HistogramNeededItems.Delete(col)
}
return nil
Expand Down Expand Up @@ -1155,7 +1155,7 @@ func (h *Handle) loadNeededIndexHistograms(reader *statistics.StatsReader, idx m
}
tbl = tbl.Copy()
tbl.Indices[idx.ID] = idxHist
if h.updateStatsCache(oldCache.Update([]*statistics.Table{tbl}, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{tbl}, nil)) {
statistics.HistogramNeededItems.Delete(idx)
}
return nil
Expand Down Expand Up @@ -1798,7 +1798,7 @@ func (h *Handle) removeExtendedStatsItem(tableID int64, statsName string) {
}
newTbl := tbl.Copy()
delete(newTbl.ExtendedStats.Stats, statsName)
if h.updateStatsCache(oldCache.Update([]*statistics.Table{newTbl}, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{newTbl}, nil)) {
return
}
if retry == 1 {
Expand Down Expand Up @@ -1831,7 +1831,7 @@ func (h *Handle) ReloadExtendedStatistics() error {
}
tables = append(tables, t)
}
if h.updateStatsCache(oldCache.Update(tables, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate(tables, nil)) {
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
tbl = tbl.Copy()
tbl.Indices[item.ID] = idxHist
}
return h.updateStatsCache(oldCache.Update([]*statistics.Table{tbl}, nil, cache.WithTableStatsByQuery()))
return h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{tbl}, nil, cache.WithTableStatsByQuery()))
}

func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ OUTER:
}
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load()
if h.updateStatsCache(oldCache.Update([]*statistics.Table{newTblStats}, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate([]*statistics.Table{newTblStats}, nil)) {
break
}
}
Expand Down Expand Up @@ -797,7 +797,7 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) {
h.mu.Unlock()
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load()
if h.updateStatsCache(oldCache.Update(tbls, nil)) {
if h.updateStatsCache(oldCache.CopyAndUpdate(tbls, nil)) {
break
}
}
Expand Down