Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain,executor: store topN slow query in domain #7646

Merged
merged 13 commits into from Sep 12, 2018
46 changes: 46 additions & 0 deletions domain/domain.go
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/execdetails"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -60,6 +61,7 @@ type Domain struct {
etcdClient *clientv3.Client
wg sync.WaitGroup
gvc GlobalVariableCache
slowQuery *topNSlowQuery

MockReloadFailed MockFailure // It mocks reload failed.
}
Expand Down Expand Up @@ -329,6 +331,46 @@ func (do *Domain) Reload() error {
return nil
}

// LogTopNSlowQuery keeps topN recent slow queries in domain.
func (do *Domain) LogTopNSlowQuery(sql string, start time.Time, duration time.Duration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function takes so many parameters, which makes it hard to read and maintain, could you extract a struct to store all the parameters and pass the struct as the parameter instead?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe slowQueryInfo is 🐶

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about exporting slowQueryInfo and use slowQueryInfo instead?

detail execdetails.ExecDetails,
succ bool, connID, txnTS uint64,
user, db, tableIDs, indexIDs string) {
select {
case do.slowQuery.ch <- &slowQueryInfo{
sql: sql,
start: start,
duration: duration,
detail: detail,
succ: succ,
connID: connID,
txnTS: txnTS,
user: user,
db: db,
tableIDs: tableIDs,
indexIDs: indexIDs,
}:
default:
}
}

func (do *Domain) topNSlowQueryLoop() {
defer do.wg.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recover this goroutine.

ticker := time.NewTicker(time.Minute * 10)
defer ticker.Stop()
for {
select {
case now := <-ticker.C:
do.slowQuery.Refresh(now)
case info, ok := <-do.slowQuery.ch:
if !ok {
return
}
do.slowQuery.Push(info)
}
}
}

func (do *Domain) loadSchemaInLoop(lease time.Duration) {
defer do.wg.Done()
// Lease renewal can run at any frequency.
Expand Down Expand Up @@ -408,6 +450,7 @@ func (do *Domain) Close() {
if do.etcdClient != nil {
terror.Log(errors.Trace(do.etcdClient.Close()))
}
do.slowQuery.Close()
do.sysSessionPool.Close()
do.wg.Wait()
log.Info("[domain] close")
Expand Down Expand Up @@ -471,6 +514,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
sysSessionPool: pools.NewResourcePool(factory, capacity, capacity, resourceIdleTimeout),
statsLease: statsLease,
infoHandle: infoschema.NewHandle(store),
slowQuery: newTopNSlowQuery(30, time.Hour*24*7),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be configurable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but we can do it in another PR.

}
}

Expand Down Expand Up @@ -529,6 +573,8 @@ func (do *Domain) Init(ddlLease time.Duration, sysFactory func(*Domain) (pools.R
// Local store needs to get the change information for every DDL state in each session.
go do.loadSchemaInLoop(ddlLease)
}
do.wg.Add(1)
go do.topNSlowQueryLoop()

return nil
}
Expand Down
118 changes: 118 additions & 0 deletions domain/topn_slow_query.go
@@ -0,0 +1,118 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
"time"

"github.com/pingcap/tidb/util/execdetails"
)

// topNSlowQuery maintains a heap to store recent slow queries.
// N = 30, recent = 7 days by default.
type topNSlowQuery struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple query entries, so I think topNSlowQueries is better.

data []*slowQueryInfo
offset int
recent time.Duration
ch chan *slowQueryInfo
}

func newTopNSlowQuery(topN int, recent time.Duration) *topNSlowQuery {
return &topNSlowQuery{
data: make([]*slowQueryInfo, topN),
recent: recent,
ch: make(chan *slowQueryInfo, 1000),
}
}

func (q *topNSlowQuery) Close() {
close(q.ch)
}

func (q *topNSlowQuery) Push(info *slowQueryInfo) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not implement the Heap interface in this type?
For the name confliction, we can change this Push to Add or Append.

// Heap is not full, append to it and shift up.
if q.offset < len(q.data) {
q.data[q.offset] = info
q.shiftUp(q.offset)
q.offset++
return
}

// Replace the heap top and shift down.
if info.duration > q.data[0].duration {
q.data[0] = info
for i := 0; i < q.offset; {
left := 2*i + 1
right := 2 * (i + 1)
if left >= q.offset {
break
}
smaller := left
if right < q.offset && q.data[right].duration < q.data[left].duration {
smaller = right
}
if q.data[i].duration <= q.data[smaller].duration {
break
}
q.data[i], q.data[smaller] = q.data[smaller], q.data[i]
i = smaller
}
}
}

func (q *topNSlowQuery) shiftUp(end int) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

siftUp?

for i := end; i > 0; {
j := (i - 1) / 2
if q.data[j].duration < q.data[i].duration {
break
}
q.data[i], q.data[j] = q.data[j], q.data[i]
i = j
}
}

func (q *topNSlowQuery) Refresh(now time.Time) {
// Remove outdated slow query element.
idx := 0
for i := 0; i < q.offset; i++ {
outdateTime := q.data[i].start.Add(q.recent)
if outdateTime.After(now) {
q.data[idx] = q.data[i]
idx++
}
}
if q.offset == idx {
return
}
q.offset = idx

// Rebuild the heap.
for i := 1; i < q.offset; i++ {
q.shiftUp(i)
}
}

type slowQueryInfo struct {
sql string
start time.Time
duration time.Duration
detail execdetails.ExecDetails
succ bool
connID uint64
txnTS uint64
user string
db string
tableIDs string
indexIDs string
}
110 changes: 110 additions & 0 deletions domain/topn_slow_query_test.go
@@ -0,0 +1,110 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package domain

import (
"time"

. "github.com/pingcap/check"
)

var _ = Suite(&testTopNSlowQuerySuite{})

type testTopNSlowQuerySuite struct{}

func (t *testTopNSlowQuerySuite) TestPush(c *C) {
slowQuery := newTopNSlowQuery(10, 0)
// Insert data into the heap.
slowQuery.Push(&slowQueryInfo{duration: 300 * time.Millisecond})
slowQuery.Push(&slowQueryInfo{duration: 400 * time.Millisecond})
slowQuery.Push(&slowQueryInfo{duration: 500 * time.Millisecond})
slowQuery.Push(&slowQueryInfo{duration: 600 * time.Millisecond})
slowQuery.Push(&slowQueryInfo{duration: 700 * time.Millisecond})
slowQuery.Push(&slowQueryInfo{duration: 800 * time.Millisecond})
slowQuery.Push(&slowQueryInfo{duration: 900 * time.Millisecond})
slowQuery.Push(&slowQueryInfo{duration: 1000 * time.Millisecond})
slowQuery.Push(&slowQueryInfo{duration: 1100 * time.Millisecond})
slowQuery.Push(&slowQueryInfo{duration: 1200 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 300*time.Millisecond)
checkHeap(slowQuery, c)

// Update all data in the heap.
slowQuery.Push(&slowQueryInfo{duration: 1300 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 400*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 1400 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 500*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 1500 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 600*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 1500 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 700*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 1600 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 800*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 1700 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 900*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 1800 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 1000*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 1900 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 1100*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 2000 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 1200*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 2100 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond)
checkHeap(slowQuery, c)

// Data smaller than heap top will not be inserted.
slowQuery.Push(&slowQueryInfo{duration: 1200 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond)
slowQuery.Push(&slowQueryInfo{duration: 666 * time.Millisecond})
c.Assert(slowQuery.data[0].duration, Equals, 1300*time.Millisecond)
}

func (t *testTopNSlowQuerySuite) TestRefresh(c *C) {
now := time.Now()
slowQuery := newTopNSlowQuery(6, 3*time.Second)

slowQuery.Push(&slowQueryInfo{start: now, duration: 6})
slowQuery.Push(&slowQueryInfo{start: now.Add(1 * time.Second), duration: 5})
slowQuery.Push(&slowQueryInfo{start: now.Add(2 * time.Second), duration: 4})
slowQuery.Push(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3})
slowQuery.Push(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2})
c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond)

slowQuery.Refresh(now.Add(5 * time.Second))
c.Assert(slowQuery.offset, Equals, 2)
c.Assert(slowQuery.data[0].duration, Equals, 2*time.Nanosecond)

slowQuery.Push(&slowQueryInfo{start: now.Add(3 * time.Second), duration: 3})
slowQuery.Push(&slowQueryInfo{start: now.Add(4 * time.Second), duration: 2})
slowQuery.Push(&slowQueryInfo{start: now.Add(5 * time.Second), duration: 1})
slowQuery.Push(&slowQueryInfo{start: now.Add(6 * time.Second), duration: 0})
c.Assert(slowQuery.offset, Equals, 6)
c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond)

slowQuery.Refresh(now.Add(6 * time.Second))
c.Assert(slowQuery.offset, Equals, 4)
c.Assert(slowQuery.data[0].duration, Equals, 0*time.Nanosecond)
}

func checkHeap(q *topNSlowQuery, c *C) {
for i := 0; i < q.offset; i++ {
left := 2*i + 1
right := 2*i + 2
if left < q.offset {
c.Assert(q.data[i].duration, LessEqual, q.data[left].duration)
}
if right < q.offset {
c.Assert(q.data[i].duration, LessEqual, q.data[right].duration)
}
}
}
8 changes: 8 additions & 0 deletions executor/adapter.go
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -371,6 +372,13 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
if !sessVars.InRestrictedSQL {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just log general sql? I prefer to keep two heap to log the general sql and internal sql.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal SQL are always the same, it will not give us too much information.

var userString string
if user != nil {
userString = user.String()
}
domain.GetDomain(a.Ctx).LogTopNSlowQuery(sql, a.startTime, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, txnTS, userString, currentDB, tableIDs, indexIDs)
}
}
}

Expand Down