-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
executor.go
358 lines (294 loc) · 11.2 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
package query
import (
"context"
"errors"
"fmt"
"os"
"runtime/debug"
"strconv"
"time"
"github.com/influxdata/influxdb/v2"
iql "github.com/influxdata/influxdb/v2/influxql"
"github.com/influxdata/influxdb/v2/influxql/control"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxql"
"github.com/opentracing/opentracing-go/log"
"go.uber.org/zap"
)
var (
// ErrInvalidQuery is returned when executing an unknown query type.
ErrInvalidQuery = errors.New("invalid query")
// ErrNotExecuted is returned when a statement is not executed in a query.
// This can occur when a previous statement in the same query has errored.
ErrNotExecuted = errors.New("not executed")
// ErrQueryInterrupted is an error returned when the query is interrupted.
ErrQueryInterrupted = errors.New("query interrupted")
)
const (
// PanicCrashEnv is the environment variable that, when set, will prevent
// the handler from recovering any panics.
PanicCrashEnv = "INFLUXDB_PANIC_CRASH"
)
// ErrDatabaseNotFound returns a database not found error for the given database name.
func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) }
// ErrMaxSelectPointsLimitExceeded is an error when a query hits the maximum number of points.
func ErrMaxSelectPointsLimitExceeded(n, limit int) error {
return fmt.Errorf("max-select-point limit exceeed: (%d/%d)", n, limit)
}
// ErrMaxConcurrentQueriesLimitExceeded is an error when a query cannot be run
// because the maximum number of queries has been reached.
func ErrMaxConcurrentQueriesLimitExceeded(n, limit int) error {
return fmt.Errorf("max-concurrent-queries limit exceeded(%d, %d)", n, limit)
}
// Authorizer determines if certain operations are authorized.
type Authorizer interface {
// AuthorizeDatabase indicates whether the given Privilege is authorized on the database with the given name.
AuthorizeDatabase(p influxql.Privilege, name string) bool
// AuthorizeQuery returns an error if the query cannot be executed
AuthorizeQuery(database string, query *influxql.Query) error
// AuthorizeSeriesRead determines if a series is authorized for reading
AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool
// AuthorizeSeriesWrite determines if a series is authorized for writing
AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool
}
// OpenAuthorizer is the Authorizer used when authorization is disabled.
// It allows all operations.
type openAuthorizer struct{}
// OpenAuthorizer can be shared by all goroutines.
var OpenAuthorizer = openAuthorizer{}
// AuthorizeDatabase returns true to allow any operation on a database.
func (a openAuthorizer) AuthorizeDatabase(influxql.Privilege, string) bool { return true }
// AuthorizeSeriesRead allows access to any series.
func (a openAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
return true
}
// AuthorizeSeriesWrite allows access to any series.
func (a openAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
return true
}
// AuthorizeSeriesRead allows any query to execute.
func (a openAuthorizer) AuthorizeQuery(_ string, _ *influxql.Query) error { return nil }
// AuthorizerIsOpen returns true if the provided Authorizer is guaranteed to
// authorize anything. A nil Authorizer returns true for this function, and this
// function should be preferred over directly checking if an Authorizer is nil
// or not.
func AuthorizerIsOpen(a Authorizer) bool {
if u, ok := a.(interface{ AuthorizeUnrestricted() bool }); ok {
return u.AuthorizeUnrestricted()
}
return a == nil || a == OpenAuthorizer
}
// ExecutionOptions contains the options for executing a query.
type ExecutionOptions struct {
// OrgID is the organization for which this query is being executed.
OrgID influxdb.ID
// The database the query is running against.
Database string
// The retention policy the query is running against.
RetentionPolicy string
// How to determine whether the query is allowed to execute,
// what resources can be returned in SHOW queries, etc.
Authorizer Authorizer
// The requested maximum number of points to return in each result.
ChunkSize int
// If this query is being executed in a read-only context.
ReadOnly bool
// Node to execute on.
NodeID uint64
// Quiet suppresses non-essential output from the query executor.
Quiet bool
}
type (
iteratorsContextKey struct{}
)
// NewContextWithIterators returns a new context.Context with the *Iterators slice added.
// The query planner will add instances of AuxIterator to the Iterators slice.
func NewContextWithIterators(ctx context.Context, itr *Iterators) context.Context {
return context.WithValue(ctx, iteratorsContextKey{}, itr)
}
// StatementExecutor executes a statement within the Executor.
type StatementExecutor interface {
// ExecuteStatement executes a statement. Results should be sent to the
// results channel in the ExecutionContext.
ExecuteStatement(ctx context.Context, stmt influxql.Statement, ectx *ExecutionContext) error
}
// StatementNormalizer normalizes a statement before it is executed.
type StatementNormalizer interface {
// NormalizeStatement adds a default database and policy to the
// measurements in the statement.
NormalizeStatement(ctx context.Context, stmt influxql.Statement, database, retentionPolicy string, ectx *ExecutionContext) error
}
var (
nullNormalizer StatementNormalizer = &nullNormalizerImpl{}
)
type nullNormalizerImpl struct{}
func (n *nullNormalizerImpl) NormalizeStatement(ctx context.Context, stmt influxql.Statement, database, retentionPolicy string, ectx *ExecutionContext) error {
return nil
}
// Executor executes every statement in an Query.
type Executor struct {
// Used for executing a statement in the query.
StatementExecutor StatementExecutor
// StatementNormalizer normalizes a statement before it is executed.
StatementNormalizer StatementNormalizer
Metrics *control.ControllerMetrics
log *zap.Logger
}
// NewExecutor returns a new instance of Executor.
func NewExecutor(logger *zap.Logger, cm *control.ControllerMetrics) *Executor {
return &Executor{
StatementNormalizer: nullNormalizer,
Metrics: cm,
log: logger.With(zap.String("service", "query")),
}
}
// Close kills all running queries and prevents new queries from being attached.
func (e *Executor) Close() error {
return nil
}
// ExecuteQuery executes each statement within a query.
func (e *Executor) ExecuteQuery(ctx context.Context, query *influxql.Query, opt ExecutionOptions) (<-chan *Result, *iql.Statistics) {
results := make(chan *Result)
statistics := new(iql.Statistics)
go e.executeQuery(ctx, query, opt, results, statistics)
return results, statistics
}
func (e *Executor) executeQuery(ctx context.Context, query *influxql.Query, opt ExecutionOptions, results chan *Result, statistics *iql.Statistics) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer func() {
close(results)
span.Finish()
}()
defer e.recover(query, results)
gatherer := new(iql.StatisticsGatherer)
statusLabel := control.LabelSuccess
defer func(start time.Time) {
dur := time.Since(start)
e.Metrics.ExecutingDuration.WithLabelValues(statusLabel).Observe(dur.Seconds())
}(time.Now())
ectx := &ExecutionContext{StatisticsGatherer: gatherer, ExecutionOptions: opt}
// Setup the execution context that will be used when executing statements.
ectx.Results = results
var i int
LOOP:
for ; i < len(query.Statements); i++ {
ectx.statementID = i
stmt := query.Statements[i]
// If a default database wasn't passed in by the caller, check the statement.
defaultDB := opt.Database
if defaultDB == "" {
if s, ok := stmt.(influxql.HasDefaultDatabase); ok {
defaultDB = s.DefaultDatabase()
}
}
// Do not let queries manually use the system measurements. If we find
// one, return an error. This prevents a person from using the
// measurement incorrectly and causing a panic.
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
for _, s := range stmt.Sources {
switch s := s.(type) {
case *influxql.Measurement:
if influxql.IsSystemName(s.Name) {
command := "the appropriate meta command"
switch s.Name {
case "_fieldKeys":
command = "SHOW FIELD KEYS"
case "_measurements":
command = "SHOW MEASUREMENTS"
case "_series":
command = "SHOW SERIES"
case "_tagKeys":
command = "SHOW TAG KEYS"
case "_tags":
command = "SHOW TAG VALUES"
}
_ = ectx.Send(ctx, &Result{
Err: fmt.Errorf("unable to use system source '%s': use %s instead", s.Name, command),
})
break LOOP
}
}
}
}
// Rewrite statements, if necessary.
// This can occur on meta read statements which convert to SELECT statements.
newStmt, err := RewriteStatement(stmt)
if err != nil {
_ = ectx.Send(ctx, &Result{Err: err})
break
}
stmt = newStmt
if err := e.StatementNormalizer.NormalizeStatement(ctx, stmt, defaultDB, opt.RetentionPolicy, ectx); err != nil {
if err := ectx.Send(ctx, &Result{Err: err}); err != nil {
return
}
break
}
statistics.StatementCount += 1
// Log each normalized statement.
if !ectx.Quiet {
e.log.Info("Executing query", zap.Stringer("query", stmt))
span.LogFields(log.String("normalized_query", stmt.String()))
}
gatherer.Reset()
stmtStart := time.Now()
// Send any other statements to the underlying statement executor.
err = tracing.LogError(span, e.StatementExecutor.ExecuteStatement(ctx, stmt, ectx))
stmtDur := time.Since(stmtStart)
stmtStats := gatherer.Statistics()
stmtStats.ExecuteDuration = stmtDur - stmtStats.PlanDuration
statistics.Add(stmtStats)
// Send an error for this result if it failed for some reason.
if err != nil {
statusLabel = control.LabelNotExecuted
e.Metrics.Requests.WithLabelValues(statusLabel).Inc()
_ = ectx.Send(ctx, &Result{
StatementID: i,
Err: err,
})
// Stop after the first error.
break
}
e.Metrics.Requests.WithLabelValues(statusLabel).Inc()
// Check if the query was interrupted during an uninterruptible statement.
if err := ctx.Err(); err != nil {
statusLabel = control.LabelInterruptedErr
e.Metrics.Requests.WithLabelValues(statusLabel).Inc()
break
}
}
// Send error results for any statements which were not executed.
for ; i < len(query.Statements)-1; i++ {
if err := ectx.Send(ctx, &Result{
StatementID: i,
Err: ErrNotExecuted,
}); err != nil {
break
}
}
}
// Determines if the Executor will recover any panics or let them crash
// the server.
var willCrash bool
func init() {
var err error
if willCrash, err = strconv.ParseBool(os.Getenv(PanicCrashEnv)); err != nil {
willCrash = false
}
}
func (e *Executor) recover(query *influxql.Query, results chan *Result) {
if err := recover(); err != nil {
e.log.Error(fmt.Sprintf("%s [panic:%s] %s", query.String(), err, debug.Stack()))
results <- &Result{
StatementID: -1,
Err: fmt.Errorf("%s [panic:%s]", query.String(), err),
}
if willCrash {
e.log.Error("\n\n=====\nAll goroutines now follow:")
buf := debug.Stack()
e.log.Error(fmt.Sprintf("%s", buf))
os.Exit(1)
}
}
}