Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan: remove mutexes #7468

Merged
merged 1 commit into from Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 25 additions & 30 deletions go/vt/vtgate/engine/primitive.go
Expand Up @@ -18,7 +18,7 @@ package engine

import (
"encoding/json"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/vtgate/vindexes"
Expand Down Expand Up @@ -154,13 +154,12 @@ type (
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting

mu sync.Mutex // Mutex to protect the fields below
ExecCount uint64 // Count of times this plan was executed
ExecTime time.Duration // Total execution time
ShardQueries uint64 // Total number of shard queries
RowsReturned uint64 // Total number of rows
RowsAffected uint64 // Total number of rows
Errors uint64 // Total number of errors
ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
ShardQueries uint64 // Total number of shard queries
RowsReturned uint64 // Total number of rows
RowsAffected uint64 // Total number of rows
Errors uint64 // Total number of errors
}

// Match is used to check if a Primitive matches
Expand Down Expand Up @@ -199,26 +198,22 @@ type (

// AddStats updates the plan execution statistics
func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
p.mu.Lock()
p.ExecCount += execCount
p.ExecTime += execTime
p.ShardQueries += shardQueries
p.RowsAffected += rowsAffected
p.RowsReturned += rowsReturned
p.Errors += errors
p.mu.Unlock()
atomic.AddUint64(&p.ExecCount, execCount)
atomic.AddUint64(&p.ExecTime, uint64(execTime))
atomic.AddUint64(&p.ShardQueries, shardQueries)
atomic.AddUint64(&p.RowsAffected, rowsAffected)
atomic.AddUint64(&p.RowsReturned, rowsReturned)
atomic.AddUint64(&p.Errors, errors)
}

// Stats returns a copy of the plan execution statistics
func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries, rowsAffected, rowsReturned, errors uint64) {
p.mu.Lock()
execCount = p.ExecCount
execTime = p.ExecTime
shardQueries = p.ShardQueries
rowsAffected = p.RowsAffected
rowsReturned = p.RowsReturned
errors = p.Errors
p.mu.Unlock()
execCount = atomic.LoadUint64(&p.ExecCount)
execTime = time.Duration(atomic.LoadUint64(&p.ExecTime))
shardQueries = atomic.LoadUint64(&p.ShardQueries)
rowsAffected = atomic.LoadUint64(&p.RowsAffected)
rowsReturned = atomic.LoadUint64(&p.RowsReturned)
errors = atomic.LoadUint64(&p.Errors)
return
}

Expand Down Expand Up @@ -263,12 +258,12 @@ func (p *Plan) MarshalJSON() ([]byte, error) {
QueryType: p.Type.String(),
Original: p.Original,
Instructions: instructions,
ExecCount: p.ExecCount,
ExecTime: p.ExecTime,
ShardQueries: p.ShardQueries,
RowsAffected: p.RowsAffected,
RowsReturned: p.RowsReturned,
Errors: p.Errors,
ExecCount: atomic.LoadUint64(&p.ExecCount),
ExecTime: time.Duration(atomic.LoadUint64(&p.ExecTime)),
ShardQueries: atomic.LoadUint64(&p.ShardQueries),
RowsAffected: atomic.LoadUint64(&p.RowsAffected),
RowsReturned: atomic.LoadUint64(&p.RowsReturned),
Errors: atomic.LoadUint64(&p.Errors),
}
return json.Marshal(marshalPlan)
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/executor.go
Expand Up @@ -372,7 +372,7 @@ func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, lo
func (e *Executor) handleCommit(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
logStats.ShardQueries = uint64(len(safeSession.ShardSessions))
e.updateQueryCounts("Commit", "", "", int64(logStats.ShardQueries))

err := e.txConn.Commit(ctx, safeSession)
Expand All @@ -388,7 +388,7 @@ func (e *Executor) Commit(ctx context.Context, safeSession *SafeSession) error {
func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession, logStats *LogStats) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
logStats.ShardQueries = uint64(len(safeSession.ShardSessions))
e.updateQueryCounts("Rollback", "", "", int64(logStats.ShardQueries))
err := e.txConn.Rollback(ctx, safeSession)
logStats.CommitTime = time.Since(execStart)
Expand All @@ -398,7 +398,7 @@ func (e *Executor) handleRollback(ctx context.Context, safeSession *SafeSession,
func (e *Executor) handleSavepoint(ctx context.Context, safeSession *SafeSession, sql string, planType string, logStats *LogStats, nonTxResponse func(query string) (*sqltypes.Result, error), ignoreMaxMemoryRows bool) (*sqltypes.Result, error) {
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
logStats.ShardQueries = uint32(len(safeSession.ShardSessions))
logStats.ShardQueries = uint64(len(safeSession.ShardSessions))
e.updateQueryCounts(planType, "", "", int64(logStats.ShardQueries))
defer func() {
logStats.ExecuteTime = time.Since(execStart)
Expand Down Expand Up @@ -1622,7 +1622,7 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession,
}
logStats.RowsAffected = qr.RowsAffected

plan.AddStats(1, time.Since(logStats.StartTime), uint64(logStats.ShardQueries), qr.RowsAffected, uint64(len(qr.Rows)), errCount)
plan.AddStats(1, time.Since(logStats.StartTime), logStats.ShardQueries, qr.RowsAffected, uint64(len(qr.Rows)), errCount)

return qr.Fields, err
}
Expand Down
30 changes: 17 additions & 13 deletions go/vt/vtgate/executor_scatter_stats.go
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"html/template"
"net/http"
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/logz"
Expand Down Expand Up @@ -74,16 +75,16 @@ func (e *Executor) gatherScatterStats() (statsResults, error) {
}
plans = append(plans, plan)
routes = append(routes, route)
scatterExecTime += plan.ExecTime
scatterCount += plan.ExecCount
scatterExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime))
scatterCount += atomic.LoadUint64(&plan.ExecCount)
}
if readOnly {
readOnlyTime += plan.ExecTime
readOnlyCount += plan.ExecCount
readOnlyTime += time.Duration(atomic.LoadUint64(&plan.ExecTime))
readOnlyCount += atomic.LoadUint64(&plan.ExecCount)
}

totalExecTime += plan.ExecTime
totalCount += plan.ExecCount
totalExecTime += time.Duration(atomic.LoadUint64(&plan.ExecTime))
totalCount += atomic.LoadUint64(&plan.ExecCount)
return true
})
if err != nil {
Expand All @@ -94,19 +95,22 @@ func (e *Executor) gatherScatterStats() (statsResults, error) {
resultItems := make([]*statsResultItem, len(plans))
for i, plan := range plans {
route := routes[i]
execCount := atomic.LoadUint64(&plan.ExecCount)
execTime := time.Duration(atomic.LoadUint64(&plan.ExecTime))

var avgTimePerQuery int64
if plan.ExecCount != 0 {
avgTimePerQuery = plan.ExecTime.Nanoseconds() / int64(plan.ExecCount)
if execCount != 0 {
avgTimePerQuery = execTime.Nanoseconds() / int64(execCount)
}
resultItems[i] = &statsResultItem{
Query: plan.Original,
AvgTimePerQuery: time.Duration(avgTimePerQuery),
PercentTimeOfReads: 100 * float64(plan.ExecTime) / float64(readOnlyTime),
PercentTimeOfScatters: 100 * float64(plan.ExecTime) / float64(scatterExecTime),
PercentCountOfReads: 100 * float64(plan.ExecCount) / float64(readOnlyCount),
PercentCountOfScatters: 100 * float64(plan.ExecCount) / float64(scatterCount),
PercentTimeOfReads: 100 * float64(execTime) / float64(readOnlyTime),
PercentTimeOfScatters: 100 * float64(execTime) / float64(scatterExecTime),
PercentCountOfReads: 100 * float64(execCount) / float64(readOnlyCount),
PercentCountOfScatters: 100 * float64(execCount) / float64(scatterCount),
From: route.Keyspace.Name + "." + route.TableName,
Count: plan.ExecCount,
Count: execCount,
}
}
result := statsResults{
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/logstats.go
Expand Up @@ -47,7 +47,7 @@ type LogStats struct {
BindVariables map[string]*querypb.BindVariable
StartTime time.Time
EndTime time.Time
ShardQueries uint32
ShardQueries uint64
RowsAffected uint64
RowsReturned uint64
PlanTime time.Duration
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/queryz_test.go
Expand Up @@ -49,7 +49,7 @@ func TestQueryzHandler(t *testing.T) {
t.Fatalf("couldn't get plan from cache")
}
plan1 := result.(*engine.Plan)
plan1.ExecTime = 1 * time.Millisecond
plan1.ExecTime = uint64(1 * time.Millisecond)

// scatter
sql = "select id from user"
Expand All @@ -61,7 +61,7 @@ func TestQueryzHandler(t *testing.T) {
t.Fatalf("couldn't get plan from cache")
}
plan2 := result.(*engine.Plan)
plan2.ExecTime = 1 * time.Second
plan2.ExecTime = uint64(1 * time.Second)

sql = "insert into user (id, name) values (:id, :name)"
_, err = executorExec(executor, sql, map[string]*querypb.BindVariable{
Expand Down Expand Up @@ -91,8 +91,8 @@ func TestQueryzHandler(t *testing.T) {

require.NoError(t, err)

plan3.ExecTime = 100 * time.Millisecond
plan4.ExecTime = 200 * time.Millisecond
plan3.ExecTime = uint64(100 * time.Millisecond)
plan4.ExecTime = uint64(200 * time.Millisecond)

queryzHandler(executor, resp, req)
body, _ := ioutil.ReadAll(resp.Body)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/resolver.go
Expand Up @@ -74,7 +74,7 @@ func (res *Resolver) Execute(
return nil, err
}
if logStats != nil {
logStats.ShardQueries = uint32(len(rss))
logStats.ShardQueries = uint64(len(rss))
}

autocommit := len(rss) == 1 && canAutocommit && session.AutocommitApproval()
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/vcursor_impl.go
Expand Up @@ -402,7 +402,7 @@ func (vc *vcursorImpl) Execute(method string, query string, bindVars map[string]

// ExecuteMultiShard is part of the engine.VCursor interface.
func (vc *vcursorImpl) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, autocommit bool) (*sqltypes.Result, []error) {
atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(queries)))
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(queries)))
qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, autocommit, vc.ignoreMaxMemoryRows)

if errs == nil && rollbackOnError {
Expand Down Expand Up @@ -457,13 +457,13 @@ func (vc *vcursorImpl) ExecuteStandalone(query string, bindVars map[string]*quer

// StreamExeculteMulti is the streaming version of ExecuteMultiShard.
func (vc *vcursorImpl) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error {
atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(rss)))
atomic.AddUint64(&vc.logStats.ShardQueries, uint64(len(rss)))
return vc.executor.StreamExecuteMulti(vc.ctx, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.safeSession.Options, callback)
}

// ExecuteKeyspaceID is part of the engine.VCursor interface.
func (vc *vcursorImpl) ExecuteKeyspaceID(keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) {
atomic.AddUint32(&vc.logStats.ShardQueries, 1)
atomic.AddUint64(&vc.logStats.ShardQueries, 1)
rss, _, err := vc.ResolveDestinations(keyspace, nil, []key.Destination{key.DestinationKeyspaceID(ksid)})
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 25 additions & 29 deletions go/vt/vttablet/tabletserver/query_engine.go
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"

"context"
Expand Down Expand Up @@ -60,37 +61,32 @@ type TabletPlan struct {
Rules *rules.Rules
Authorized []*tableacl.ACLResult

mu sync.Mutex
QueryCount int64
Time time.Duration
MysqlTime time.Duration
RowsAffected int64
RowsReturned int64
ErrorCount int64
QueryCount uint64
Time uint64
MysqlTime uint64
RowsAffected uint64
RowsReturned uint64
ErrorCount uint64
}

// AddStats updates the stats for the current TabletPlan.
func (ep *TabletPlan) AddStats(queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) {
ep.mu.Lock()
ep.QueryCount += queryCount
ep.Time += duration
ep.MysqlTime += mysqlTime
ep.RowsAffected += rowsAffected
ep.RowsReturned += rowsReturned
ep.ErrorCount += errorCount
ep.mu.Unlock()
func (ep *TabletPlan) AddStats(queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) {
atomic.AddUint64(&ep.QueryCount, queryCount)
atomic.AddUint64(&ep.Time, uint64(duration))
atomic.AddUint64(&ep.MysqlTime, uint64(mysqlTime))
atomic.AddUint64(&ep.RowsAffected, rowsAffected)
atomic.AddUint64(&ep.RowsReturned, rowsReturned)
atomic.AddUint64(&ep.ErrorCount, errorCount)
}

// Stats returns the current stats of TabletPlan.
func (ep *TabletPlan) Stats() (queryCount int64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount int64) {
ep.mu.Lock()
queryCount = ep.QueryCount
duration = ep.Time
mysqlTime = ep.MysqlTime
rowsAffected = ep.RowsAffected
rowsReturned = ep.RowsReturned
errorCount = ep.ErrorCount
ep.mu.Unlock()
func (ep *TabletPlan) Stats() (queryCount uint64, duration, mysqlTime time.Duration, rowsAffected, rowsReturned, errorCount uint64) {
queryCount = atomic.LoadUint64(&ep.QueryCount)
duration = time.Duration(atomic.LoadUint64(&ep.Time))
mysqlTime = time.Duration(atomic.LoadUint64(&ep.MysqlTime))
rowsAffected = atomic.LoadUint64(&ep.RowsAffected)
rowsReturned = atomic.LoadUint64(&ep.RowsReturned)
errorCount = atomic.LoadUint64(&ep.ErrorCount)
return
}

Expand Down Expand Up @@ -431,12 +427,12 @@ type perQueryStats struct {
Query string
Table string
Plan planbuilder.PlanType
QueryCount int64
QueryCount uint64
Time time.Duration
MysqlTime time.Duration
RowsAffected int64
RowsReturned int64
ErrorCount int64
RowsAffected uint64
RowsReturned uint64
ErrorCount uint64
}

func (qe *QueryEngine) handleHTTPQueryPlans(response http.ResponseWriter, request *http.Request) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor.go
Expand Up @@ -87,7 +87,7 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) {
return
}
qre.tsv.qe.AddStats(planName, tableName, 1, duration, mysqlTime, int64(reply.RowsAffected), 0)
qre.plan.AddStats(1, duration, mysqlTime, int64(reply.RowsAffected), int64(len(reply.Rows)), 0)
qre.plan.AddStats(1, duration, mysqlTime, reply.RowsAffected, uint64(len(reply.Rows)), 0)
qre.logStats.RowsAffected = int(reply.RowsAffected)
qre.logStats.Rows = reply.Rows
qre.tsv.Stats().ResultHistogram.Add(int64(len(reply.Rows)))
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vttablet/tabletserver/queryz.go
Expand Up @@ -76,12 +76,12 @@ type queryzRow struct {
Query string
Table string
Plan planbuilder.PlanType
Count int64
Count uint64
tm time.Duration
mysqlTime time.Duration
RowsAffected int64
RowsReturned int64
Errors int64
RowsAffected uint64
RowsReturned uint64
Errors uint64
Color string
}

Expand Down Expand Up @@ -164,7 +164,7 @@ func queryzHandler(qe *QueryEngine, w http.ResponseWriter, r *http.Request) {
Value.Count, Value.tm, Value.mysqlTime, Value.RowsAffected, Value.RowsReturned, Value.Errors = plan.Stats()
var timepq time.Duration
if Value.Count != 0 {
timepq = time.Duration(int64(Value.tm) / Value.Count)
timepq = Value.tm / time.Duration(Value.Count)
}
if timepq < 10*time.Millisecond {
Value.Color = "low"
Expand Down