-
Notifications
You must be signed in to change notification settings - Fork 4
/
primitive.go
129 lines (108 loc) · 4.48 KB
/
primitive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package engine
import (
"sync"
"time"
"golang.org/x/net/context"
"gopkg.in/src-d/go-vitess.v1/sqltypes"
"gopkg.in/src-d/go-vitess.v1/vt/key"
"gopkg.in/src-d/go-vitess.v1/vt/srvtopo"
querypb "gopkg.in/src-d/go-vitess.v1/vt/proto/query"
)
// SeqVarName is a reserved bind var name for sequence values.
const SeqVarName = "__seq"
// ListVarName is a reserved bind var name for list vars.
// This is used for sending different IN clause values
// to different shards.
const ListVarName = "__vals"
// VCursor defines the interface the engine will use
// to execute routes.
type VCursor interface {
// Context returns the context of the current request.
Context() context.Context
// SetContextTimeout updates the context and sets a timeout.
SetContextTimeout(timeout time.Duration) context.CancelFunc
// RecordWarning stores the given warning in the current session
RecordWarning(warning *querypb.QueryWarning)
// V3 functions.
Execute(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error)
ExecuteAutocommit(method string, query string, bindvars map[string]*querypb.BindVariable, isDML bool) (*sqltypes.Result, error)
AutocommitApproval() bool
// Shard-level functions.
ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, isDML, canAutocommit bool) (*sqltypes.Result, []error)
ExecuteStandalone(query string, bindvars map[string]*querypb.BindVariable, rs *srvtopo.ResolvedShard) (*sqltypes.Result, error)
StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error
// Resolver methods, from key.Destination to srvtopo.ResolvedShard.
// Will replace all of the Topo functions.
ResolveDestinations(keyspace string, ids []*querypb.Value, destinations []key.Destination) ([]*srvtopo.ResolvedShard, [][]*querypb.Value, error)
}
// Plan represents the execution strategy for a given query.
// For now it's a simple wrapper around the real instructions.
// An instruction (aka Primitive) is typically a tree where
// each node does its part by combining the results of the
// sub-nodes.
type Plan struct {
// Original is the original query.
Original string `json:",omitempty"`
// Instructions contains the instructions needed to
// fulfil the query.
Instructions Primitive `json:",omitempty"`
// Mutex to protect the stats
mu sync.Mutex
// Count of times this plan was executed
ExecCount uint64
// Total execution time
ExecTime time.Duration
// Total number of shard queries
ShardQueries uint64
// Total number of rows
Rows uint64
// Total number of errors
Errors uint64
}
// AddStats updates the plan execution statistics
func (p *Plan) AddStats(execCount uint64, execTime time.Duration, shardQueries, rows, errors uint64) {
p.mu.Lock()
p.ExecCount += execCount
p.ExecTime += execTime
p.ShardQueries += shardQueries
p.Rows += rows
p.Errors += errors
p.mu.Unlock()
}
// Stats returns a copy of the plan execution statistics
func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries, rows, errors uint64) {
p.mu.Lock()
execCount = p.ExecCount
execTime = p.ExecTime
shardQueries = p.ShardQueries
rows = p.Rows
errors = p.Errors
p.mu.Unlock()
return
}
// Size is defined so that Plan can be given to a cache.LRUCache.
// VTGate needs to maintain a cache of plans. It uses LRUCache, which
// in turn requires its objects to define a Size function.
func (p *Plan) Size() int {
return 1
}
// Primitive is the interface that needs to be satisfied by
// all primitives of a plan.
type Primitive interface {
RouteType() string
Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error)
StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantields bool, callback func(*sqltypes.Result) error) error
GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error)
}