Skip to content

Commit

Permalink
stats: reduce stats collecor's lock contention (pingcap#9233)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx committed Feb 25, 2019
1 parent e58cae4 commit 2512d2b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 43 deletions.
70 changes: 31 additions & 39 deletions statistics/update.go
Expand Up @@ -112,18 +112,14 @@ func (m errorRateDeltaMap) clear(tableID int64, histID int64, isIndex bool) {
m[tableID] = item
}

func (h *Handle) merge(s *SessionStatsCollector) {
s.Lock()
defer s.Unlock()
func (h *Handle) merge(s *SessionStatsCollector, rateMap errorRateDeltaMap) {
for id, item := range s.mapper {
h.globalMap.update(id, item.Delta, item.Count, &item.ColSize)
}
h.mu.Lock()
h.mu.rateMap.merge(s.rateMap)
h.mu.Unlock()
s.mapper = make(tableDeltaMap)
rateMap.merge(s.rateMap)
s.rateMap = make(errorRateDeltaMap)
h.feedback = mergeQueryFeedback(h.feedback, s.feedback)
s.mapper = make(tableDeltaMap)
s.feedback = s.feedback[:0]
}

Expand All @@ -134,7 +130,6 @@ type SessionStatsCollector struct {
mapper tableDeltaMap
feedback []*QueryFeedback
rateMap errorRateDeltaMap
prev *SessionStatsCollector
next *SessionStatsCollector
// deleted is set to true when a session is closed. Every time we sweep the list, we will remove the useless collector.
deleted bool
Expand Down Expand Up @@ -207,21 +202,6 @@ func (s *SessionStatsCollector) StoreQueryFeedback(feedback interface{}, h *Hand
return nil
}

// tryToRemoveFromList will remove this collector from the list if it's deleted flag is set.
func (s *SessionStatsCollector) tryToRemoveFromList() {
s.Lock()
defer s.Unlock()
if !s.deleted {
return
}
next := s.next
prev := s.prev
prev.next = next
if next != nil {
next.prev = prev
}
}

// NewSessionStatsCollector allocates a stats collector for a session.
func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
h.listHead.Lock()
Expand All @@ -230,10 +210,6 @@ func (h *Handle) NewSessionStatsCollector() *SessionStatsCollector {
mapper: make(tableDeltaMap),
rateMap: make(errorRateDeltaMap),
next: h.listHead.next,
prev: h.listHead,
}
if h.listHead.next != nil {
h.listHead.next.prev = newCollector
}
h.listHead.next = newCollector
return newCollector
Expand Down Expand Up @@ -275,15 +251,36 @@ const (
DumpDelta = false
)

// sweepList will loop over the list, merge each session's local stats into handle
// and remove closed session's collector.
func (h *Handle) sweepList() {
prev := h.listHead
prev.Lock()
errorRateMap := make(errorRateDeltaMap)
for curr := prev.next; curr != nil; curr = curr.next {
curr.Lock()
// Merge the session stats into handle and error rate map.
h.merge(curr, errorRateMap)
if curr.deleted {
prev.next = curr.next
// Since the session is already closed, we can safely unlock it here.
curr.Unlock()
} else {
// Unlock the previous lock, so we only holds at most two session's lock at the same time.
prev.Unlock()
prev = curr
}
}
prev.Unlock()
h.mu.Lock()
h.mu.rateMap.merge(errorRateMap)
h.mu.Unlock()
}

// DumpStatsDeltaToKV sweeps the whole list and updates the global map, then we dumps every table that held in map to KV.
// If the `dumpAll` is false, it will only dump that delta info that `Modify Count / Table Count` greater than a ratio.
func (h *Handle) DumpStatsDeltaToKV(dumpMode bool) error {
h.listHead.Lock()
for collector := h.listHead.next; collector != nil; collector = collector.next {
collector.tryToRemoveFromList()
h.merge(collector)
}
h.listHead.Unlock()
h.sweepList()
currentTime := time.Now()
for id, item := range h.globalMap {
if dumpMode == DumpDelta && !needDumpStatsDelta(h, id, item, currentTime) {
Expand Down Expand Up @@ -416,12 +413,7 @@ func (h *Handle) dumpFeedbackToKV(fb *QueryFeedback) error {
// it takes 10 minutes for a feedback to take effect. However, we can use the
// feedback locally on this tidb-server, so it could be used more timely.
func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) {
h.listHead.Lock()
for collector := h.listHead.next; collector != nil; collector = collector.next {
collector.tryToRemoveFromList()
h.merge(collector)
}
h.listHead.Unlock()
h.sweepList()
for _, fb := range h.feedback {
table, ok := is.TableByID(fb.tableID)
if !ok {
Expand Down
6 changes: 2 additions & 4 deletions statistics/update_list_test.go
Expand Up @@ -31,17 +31,15 @@ func (s *testUpdateListSuite) TestInsertAndDelete(c *C) {
items[0].Delete() // delete tail
items[2].Delete() // delete middle
items[4].Delete() // delete head
h.DumpStatsDeltaToKV(DumpAll)
h.sweepList()

c.Assert(h.listHead.next, Equals, items[3])
c.Assert(items[3].next, Equals, items[1])
c.Assert(items[1].next, IsNil)
c.Assert(items[1].prev, Equals, items[3])
c.Assert(items[3].prev, Equals, h.listHead)

// delete rest
items[1].Delete()
items[3].Delete()
h.DumpStatsDeltaToKV(DumpAll)
h.sweepList()
c.Assert(h.listHead.next, IsNil)
}

0 comments on commit 2512d2b

Please sign in to comment.