-
Notifications
You must be signed in to change notification settings - Fork 0
/
executor.go
2556 lines (2317 loc) · 88.8 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2015 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package sql
import (
"fmt"
"net/url"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"golang.org/x/net/context"
"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/jobs"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
// logStatementsExecuteEnabled causes the Executor to log executed
// statements and, if any, resulting errors.
var logStatementsExecuteEnabled = settings.RegisterBoolSetting(
"sql.trace.log_statement_execute",
"set to true to enable logging of executed statements",
false,
)
// ClusterOrganization is the organization name.
var ClusterOrganization = settings.RegisterStringSetting(
"cluster.organization",
"organization name",
"",
)
var errNoTransactionInProgress = errors.New("there is no transaction in progress")
var errStaleMetadata = errors.New("metadata is still stale")
var errTransactionInProgress = errors.New("there is already a transaction in progress")
func errWrongNumberOfPreparedStatements(n int) error {
return pgerror.NewErrorf(pgerror.CodeInvalidPreparedStatementDefinitionError,
"prepared statement had %d statements, expected 1", n)
}
const sqlTxnName string = "sql txn"
const sqlImplicitTxnName string = "sql txn implicit"
const metricsSampleInterval = 10 * time.Second
// Fully-qualified names for metrics.
var (
MetaTxnBegin = metric.Metadata{
Name: "sql.txn.begin.count",
Help: "Number of SQL transaction BEGIN statements"}
MetaTxnCommit = metric.Metadata{
Name: "sql.txn.commit.count",
Help: "Number of SQL transaction COMMIT statements"}
MetaTxnAbort = metric.Metadata{
Name: "sql.txn.abort.count",
Help: "Number of SQL transaction ABORT statements"}
MetaTxnRollback = metric.Metadata{
Name: "sql.txn.rollback.count",
Help: "Number of SQL transaction ROLLBACK statements"}
MetaSelect = metric.Metadata{
Name: "sql.select.count",
Help: "Number of SQL SELECT statements"}
MetaSQLExecLatency = metric.Metadata{
Name: "sql.exec.latency",
Help: "Latency of SQL statement execution"}
MetaSQLServiceLatency = metric.Metadata{
Name: "sql.service.latency",
Help: "Latency of SQL request execution"}
MetaDistSQLSelect = metric.Metadata{
Name: "sql.distsql.select.count",
Help: "Number of dist-SQL SELECT statements"}
MetaDistSQLExecLatency = metric.Metadata{
Name: "sql.distsql.exec.latency",
Help: "Latency of dist-SQL statement execution"}
MetaDistSQLServiceLatency = metric.Metadata{
Name: "sql.distsql.service.latency",
Help: "Latency of dist-SQL request execution"}
MetaUpdate = metric.Metadata{
Name: "sql.update.count",
Help: "Number of SQL UPDATE statements"}
MetaInsert = metric.Metadata{
Name: "sql.insert.count",
Help: "Number of SQL INSERT statements"}
MetaDelete = metric.Metadata{
Name: "sql.delete.count",
Help: "Number of SQL DELETE statements"}
MetaDdl = metric.Metadata{
Name: "sql.ddl.count",
Help: "Number of SQL DDL statements"}
MetaMisc = metric.Metadata{
Name: "sql.misc.count",
Help: "Number of other SQL statements"}
MetaQuery = metric.Metadata{
Name: "sql.query.count",
Help: "Number of SQL queries"}
)
type traceResult struct {
tag string
count int
}
func (r *traceResult) String() string {
if r.count < 0 {
return r.tag
}
return fmt.Sprintf("%s (%d results)", r.tag, r.count)
}
// ResultList represents a list of results for a list of SQL statements.
// There is one result object per SQL statement in the request.
type ResultList []Result
// StatementResults represents a list of results from running a batch of
// SQL statements, plus some meta info about the batch.
type StatementResults struct {
ResultList
// Indicates that after parsing, the request contained 0 non-empty statements.
Empty bool
}
// Close ensures that the resources claimed by the results are released.
func (s *StatementResults) Close(ctx context.Context) {
s.ResultList.Close(ctx)
}
// Close ensures that the resources claimed by the results are released.
func (rl ResultList) Close(ctx context.Context) {
for _, r := range rl {
r.Close(ctx)
}
}
// Result corresponds to the execution of a single SQL statement.
type Result struct {
Err error
// The type of statement that the result is for.
Type parser.StatementType
// The tag of the statement that the result is for.
PGTag string
// RowsAffected will be populated if the statement type is "RowsAffected".
RowsAffected int
// Columns will be populated if the statement type is "Rows". It will contain
// the names and types of the columns returned in the result set in the order
// specified in the SQL statement. The number of columns will equal the number
// of values in each Row.
Columns sqlbase.ResultColumns
// Rows will be populated if the statement type is "Rows". It will contain
// the result set of the result.
// TODO(nvanbenschoten): Can this be streamed from the planNode?
Rows *sqlbase.RowContainer
}
// Close ensures that the resources claimed by the result are released.
func (r *Result) Close(ctx context.Context) {
// The Rows pointer may be nil if the statement returned no rows or
// if an error occurred.
if r.Rows != nil {
r.Rows.Close(ctx)
}
}
// An Executor executes SQL statements.
// Executor is thread-safe.
type Executor struct {
cfg ExecutorConfig
stopper *stop.Stopper
reCache *parser.RegexpCache
virtualSchemas virtualSchemaHolder
// Transient stats.
SelectCount *metric.Counter
// The subset of SELECTs that are processed through DistSQL.
DistSQLSelectCount *metric.Counter
DistSQLExecLatency *metric.Histogram
SQLExecLatency *metric.Histogram
DistSQLServiceLatency *metric.Histogram
SQLServiceLatency *metric.Histogram
TxnBeginCount *metric.Counter
// txnCommitCount counts the number of times a COMMIT was attempted.
TxnCommitCount *metric.Counter
TxnAbortCount *metric.Counter
TxnRollbackCount *metric.Counter
UpdateCount *metric.Counter
InsertCount *metric.Counter
DeleteCount *metric.Counter
DdlCount *metric.Counter
MiscCount *metric.Counter
QueryCount *metric.Counter
// System Config and mutex.
systemConfig config.SystemConfig
// databaseCache is updated with systemConfigMu held, but read atomically in
// order to avoid recursive locking. See WaitForGossipUpdate.
databaseCache atomic.Value
systemConfigMu syncutil.Mutex
systemConfigCond *sync.Cond
distSQLPlanner *distSQLPlanner
// Application-level SQL statistics
sqlStats sqlStats
// Attempts to use unimplemented features.
unimplementedErrors struct {
syncutil.Mutex
counts map[string]int64
}
}
// NodeInfo contains metadata about the executing node and cluster.
type NodeInfo struct {
ClusterID func() uuid.UUID
NodeID *base.NodeIDContainer
AdminURL func() *url.URL
PGURL func(*url.Userinfo) (*url.URL, error)
}
// An ExecutorConfig encompasses the auxiliary objects and configuration
// required to create an executor.
// All fields holding a pointer or an interface are required to create
// a Executor; the rest will have sane defaults set if omitted.
type ExecutorConfig struct {
Settings *cluster.Settings
NodeInfo
AmbientCtx log.AmbientContext
DB *client.DB
Gossip *gossip.Gossip
DistSender *kv.DistSender
RPCContext *rpc.Context
LeaseManager *LeaseManager
Clock *hlc.Clock
DistSQLSrv *distsqlrun.ServerImpl
StatusServer serverpb.StatusServer
SessionRegistry *SessionRegistry
JobRegistry *jobs.Registry
TestingKnobs *ExecutorTestingKnobs
SchemaChangerTestingKnobs *SchemaChangerTestingKnobs
// HistogramWindowInterval is (server.Context).HistogramWindowInterval.
HistogramWindowInterval time.Duration
// Caches updated by DistSQL.
RangeDescriptorCache *kv.RangeDescriptorCache
LeaseHolderCache *kv.LeaseHolderCache
}
// Organization returns the value of cluster.organization.
func (ec *ExecutorConfig) Organization() string {
return ClusterOrganization.Get(&ec.Settings.SV)
}
var _ base.ModuleTestingKnobs = &ExecutorTestingKnobs{}
// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
func (*ExecutorTestingKnobs) ModuleTestingKnobs() {}
// StatementFilter is the type of callback that
// ExecutorTestingKnobs.StatementFilter takes.
type StatementFilter func(context.Context, string, ResultsWriter, error) error
// ExecutorTestingKnobs is part of the context used to control parts of the
// system during testing.
type ExecutorTestingKnobs struct {
// WaitForGossipUpdate causes metadata-mutating operations to wait
// for the new metadata to back-propagate through gossip.
WaitForGossipUpdate bool
// CheckStmtStringChange causes Executor.execStmtGroup to verify that executed
// statements are not modified during execution.
CheckStmtStringChange bool
// StatementFilter can be used to trap execution of SQL statements and
// optionally change their results. The filter function is invoked after each
// statement has been executed.
StatementFilter StatementFilter
// BeforePrepare is called by the Executor before preparing any statement. It
// gives access to the planner that will be used to do the prepare. If any of
// the return values are not nil, the values are used as the prepare results
// and normal preparation is short-circuited.
BeforePrepare func(ctx context.Context, stmt string, planner *planner) (*PreparedStatement, error)
// BeforeExecute is called by the Executor before plan execution. It is useful
// for synchronizing statement execution, such as with parallel statemets.
BeforeExecute func(ctx context.Context, stmt string, isParallel bool)
// AfterExecute is like StatementFilter, but it runs in the same goroutine of the
// statement.
AfterExecute func(
ctx context.Context, stmt string, res StatementResult, err error,
)
// DisableAutoCommit, if set, disables the auto-commit functionality of some
// SQL statements. That functionality allows some statements to commit
// directly when they're executed in an implicit SQL txn, without waiting for
// the Executor to commit the implicit txn.
// This has to be set in tests that need to abort such statements using a
// StatementFilter; otherwise, the statement commits immediately after
// execution so there'll be nothing left to abort by the time the filter runs.
DisableAutoCommit bool
// DistSQLPlannerKnobs are testing knobs for distSQLPlanner.
DistSQLPlannerKnobs DistSQLPlannerTestingKnobs
// BeforeAutoCommit is called when the Executor is about to commit the KV
// transaction after running a statement in an implicit transaction, allowing
// tests to inject errors into that commit.
// If an error is returned, that error will be considered the result of
// txn.Commit(), and the txn.Commit() call will not actually be
// made. If no error is returned, txn.Commit() is called normally.
//
// Note that this is not called if the SQL statement representing the implicit
// transaction has committed the KV txn itself (e.g. if it used the 1-PC
// optimization). This is only called when the Executor is the one doing the
// committing.
BeforeAutoCommit func(ctx context.Context, stmt string) error
}
// DistSQLPlannerTestingKnobs is used to control internals of the distSQLPlanner
// for testing purposes.
type DistSQLPlannerTestingKnobs struct {
// If OverrideSQLHealthCheck is set, we use this callback to get the health of
// a node.
OverrideHealthCheck func(node roachpb.NodeID, addrString string) error
}
// NewExecutor creates an Executor and registers a callback on the
// system config.
func NewExecutor(cfg ExecutorConfig, stopper *stop.Stopper) *Executor {
return &Executor{
cfg: cfg,
stopper: stopper,
reCache: parser.NewRegexpCache(512),
TxnBeginCount: metric.NewCounter(MetaTxnBegin),
TxnCommitCount: metric.NewCounter(MetaTxnCommit),
TxnAbortCount: metric.NewCounter(MetaTxnAbort),
TxnRollbackCount: metric.NewCounter(MetaTxnRollback),
SelectCount: metric.NewCounter(MetaSelect),
DistSQLSelectCount: metric.NewCounter(MetaDistSQLSelect),
// TODO(mrtracy): See HistogramWindowInterval in server/config.go for the 6x factor.
DistSQLExecLatency: metric.NewLatency(MetaDistSQLExecLatency,
6*metricsSampleInterval),
SQLExecLatency: metric.NewLatency(MetaSQLExecLatency,
6*metricsSampleInterval),
DistSQLServiceLatency: metric.NewLatency(MetaDistSQLServiceLatency,
6*metricsSampleInterval),
SQLServiceLatency: metric.NewLatency(MetaSQLServiceLatency,
6*metricsSampleInterval),
UpdateCount: metric.NewCounter(MetaUpdate),
InsertCount: metric.NewCounter(MetaInsert),
DeleteCount: metric.NewCounter(MetaDelete),
DdlCount: metric.NewCounter(MetaDdl),
MiscCount: metric.NewCounter(MetaMisc),
QueryCount: metric.NewCounter(MetaQuery),
sqlStats: sqlStats{st: cfg.Settings, apps: make(map[string]*appStats)},
}
}
// Start starts workers for the executor and initializes the distSQLPlanner.
func (e *Executor) Start(
ctx context.Context, startupMemMetrics *MemoryMetrics, nodeDesc roachpb.NodeDescriptor,
) {
ctx = e.AnnotateCtx(ctx)
log.Infof(ctx, "creating distSQLPlanner with address %s", nodeDesc.Address)
e.distSQLPlanner = newDistSQLPlanner(
distsqlrun.Version,
e.cfg.Settings,
nodeDesc,
e.cfg.RPCContext,
e.cfg.DistSQLSrv,
e.cfg.DistSender,
e.cfg.Gossip,
e.stopper,
e.cfg.TestingKnobs.DistSQLPlannerKnobs,
)
e.databaseCache.Store(newDatabaseCache(e.systemConfig))
e.systemConfigCond = sync.NewCond(&e.systemConfigMu)
gossipUpdateC := e.cfg.Gossip.RegisterSystemConfigChannel()
e.stopper.RunWorker(ctx, func(ctx context.Context) {
for {
select {
case <-gossipUpdateC:
sysCfg, _ := e.cfg.Gossip.GetSystemConfig()
e.updateSystemConfig(sysCfg)
case <-e.stopper.ShouldStop():
return
}
}
})
ctx = log.WithLogTag(ctx, "startup", nil)
startupSession := NewSession(ctx, SessionArgs{}, e, nil, startupMemMetrics)
startupSession.StartUnlimitedMonitor()
if err := e.virtualSchemas.init(ctx, startupSession.newPlanner(e, nil)); err != nil {
log.Fatal(ctx, err)
}
startupSession.Finish(e)
}
// GetVirtualTabler retrieves the VirtualTabler reference for this executor.
func (e *Executor) GetVirtualTabler() VirtualTabler {
return &e.virtualSchemas
}
// SetDistSQLSpanResolver changes the SpanResolver used for DistSQL. It is the
// caller's responsibility to make sure no queries are being run with DistSQL at
// the same time.
func (e *Executor) SetDistSQLSpanResolver(spanResolver distsqlplan.SpanResolver) {
e.distSQLPlanner.setSpanResolver(spanResolver)
}
// AnnotateCtx is a convenience wrapper; see AmbientContext.
func (e *Executor) AnnotateCtx(ctx context.Context) context.Context {
return e.cfg.AmbientCtx.AnnotateCtx(ctx)
}
// updateSystemConfig is called whenever the system config gossip entry is updated.
func (e *Executor) updateSystemConfig(cfg config.SystemConfig) {
e.systemConfigMu.Lock()
defer e.systemConfigMu.Unlock()
e.systemConfig = cfg
// The database cache gets reset whenever the system config changes.
e.databaseCache.Store(newDatabaseCache(cfg))
e.systemConfigCond.Broadcast()
}
// getDatabaseCache returns a database cache with a copy of the latest
// system config.
func (e *Executor) getDatabaseCache() *databaseCache {
if v := e.databaseCache.Load(); v != nil {
return v.(*databaseCache)
}
return nil
}
// Prepare returns the result types of the given statement. pinfo may
// contain partial type information for placeholders. Prepare will
// populate the missing types. The PreparedStatement is returned (or
// nil if there are no results).
func (e *Executor) Prepare(
stmt Statement, stmtStr string, session *Session, placeholderHints parser.PlaceholderTypes,
) (res *PreparedStatement, err error) {
session.resetForBatch(e)
sessionEventf(session, "preparing: %s", stmtStr)
defer session.maybeRecover("preparing", stmtStr)
prepared := &PreparedStatement{
TypeHints: placeholderHints,
portalNames: make(map[string]struct{}),
}
// We need a memory account available in order to prepare a statement, since we
// might need to allocate memory for constant-folded values in the process of
// planning it.
prepared.constantAcc = session.mon.MakeBoundAccount()
if stmt.AST == nil {
return prepared, nil
}
prepared.Statement = stmt.AST
if err := placeholderHints.ProcessPlaceholderAnnotations(stmt.AST); err != nil {
return nil, err
}
protoTS, err := isAsOf(session, stmt.AST, e.cfg.Clock.Now())
if err != nil {
return nil, err
}
// Prepare needs a transaction because it needs to retrieve db/table
// descriptors for type checking.
txn := session.TxnState.mu.txn
if txn == nil {
// The new txn need not be the same transaction used by statements following
// this prepare statement because it can only be used by prepare() to get a
// table lease that is eventually added to the session.
//
// TODO(vivek): perhaps we should be more consistent and update
// session.TxnState.mu.txn, but more thought needs to be put into whether that
// is really needed.
txn = client.NewTxn(e.cfg.DB, e.cfg.NodeID.Get())
if err := txn.SetIsolation(session.DefaultIsolationLevel); err != nil {
panic(fmt.Errorf("cannot set up txn for prepare %q: %v", stmtStr, err))
}
txn.Proto().OrigTimestamp = e.cfg.Clock.Now()
}
planner := session.newPlanner(e, txn)
planner.semaCtx.Placeholders.SetTypeHints(placeholderHints)
planner.evalCtx.PrepareOnly = true
planner.evalCtx.ActiveMemAcc = &prepared.constantAcc
if protoTS != nil {
planner.avoidCachedDescriptors = true
txn.SetFixedTimestamp(session.Ctx(), *protoTS)
}
if filter := e.cfg.TestingKnobs.BeforePrepare; filter != nil {
res, err := filter(session.Ctx(), stmtStr, planner)
if res != nil || err != nil {
return res, err
}
}
plan, err := planner.prepare(session.Ctx(), stmt.AST)
if err != nil {
return nil, err
}
if plan == nil {
return prepared, nil
}
defer plan.Close(session.Ctx())
prepared.Columns = planColumns(plan)
for _, c := range prepared.Columns {
if err := checkResultType(c.Typ); err != nil {
return nil, err
}
}
prepared.Types = planner.semaCtx.Placeholders.Types
return prepared, nil
}
// ExecuteStatementsBuffered executes the given statement(s), buffering them
// entirely in memory prior to returning a response. If there is an error then
// we return an empty StatementResults and the error.
//
// Note that we will only receive an error even if we run a successful statement
// followed by a statement which has an error then the caller will only receive
// the error, however the first statement will have been executed.
//
// If no error is returned, the caller has to call Close() on the returned
// StatementResults.
func (e *Executor) ExecuteStatementsBuffered(
session *Session, stmts string, pinfo *parser.PlaceholderInfo, expectedNumResults int,
) (StatementResults, error) {
b := newBufferedWriter(session.makeBoundAccount())
session.ResultsWriter = b
err := e.ExecuteStatements(session, stmts, pinfo)
res := b.results()
if err != nil {
res.Close(session.Ctx())
return StatementResults{}, err
}
for _, result := range res.ResultList {
if result.Err != nil {
res.Close(session.Ctx())
return StatementResults{}, errors.Errorf("%s", result.Err)
}
}
// This needs to be the last error check since in the case of an error during
// execution this would swallow the true error.
if a, e := len(res.ResultList), expectedNumResults; a != e {
res.Close(session.Ctx())
return StatementResults{}, errors.Errorf("number of results %d != expected %d", a, e)
}
return res, nil
}
// ExecuteStatements executes the given statement(s).
func (e *Executor) ExecuteStatements(
session *Session, stmts string, pinfo *parser.PlaceholderInfo,
) error {
session.resetForBatch(e)
session.phaseTimes[sessionStartBatch] = timeutil.Now()
defer session.maybeRecover("executing", stmts)
// If the Executor wants config updates to be blocked, then block them so
// that session.testingVerifyMetadataFn can later be run on a known version
// of the system config. The point is to lock the system config so that no
// gossip updates sneak in under us. We're then able to assert that the
// verify callback only succeeds after a gossip update.
//
// This lock does not change semantics. Even outside of tests, the Executor
// uses static systemConfig for a user request, so locking the Executor's
// systemConfig cannot change the semantics of the SQL operation being
// performed under lock.
//
// NB: The locking here implies that ExecuteStatements cannot be
// called recursively. So don't do that and don't try to adjust this locking
// to allow this method to be called recursively (sync.{Mutex,RWMutex} do not
// allow that).
if e.cfg.TestingKnobs.WaitForGossipUpdate {
e.systemConfigCond.L.Lock()
defer e.systemConfigCond.L.Unlock()
}
// Send the Request for SQL execution and set the application-level error
// for each result in the reply.
return e.execRequest(session, stmts, pinfo, copyMsgNone)
}
// ExecutePreparedStatement executes the given statement and returns a response.
func (e *Executor) ExecutePreparedStatement(
session *Session, stmt *PreparedStatement, pinfo *parser.PlaceholderInfo,
) error {
defer session.maybeRecover("executing", stmt.Str)
// Block system config updates. For more details, see the comment in
// ExecuteStatements.
if e.cfg.TestingKnobs.WaitForGossipUpdate {
e.systemConfigCond.L.Lock()
defer e.systemConfigCond.L.Unlock()
}
{
// No parsing is taking place, but we need to set the parsing phase time
// because the service latency is measured from
// phaseTimes[sessionStartParse].
now := timeutil.Now()
session.phaseTimes[sessionStartParse] = now
session.phaseTimes[sessionEndParse] = now
}
return e.execPrepared(session, stmt, pinfo)
}
// execPrepared executes a prepared statement. It returns an error if there
// is more than 1 result or the returned types differ from the prepared
// return types.
func (e *Executor) execPrepared(
session *Session, stmt *PreparedStatement, pinfo *parser.PlaceholderInfo,
) error {
if log.V(2) || logStatementsExecuteEnabled.Get(&e.cfg.Settings.SV) {
log.Infof(session.Ctx(), "execPrepared: %s", stmt.Str)
}
var stmts StatementList
if stmt.Statement != nil {
stmts = StatementList{{
AST: stmt.Statement,
ExpectedTypes: stmt.Columns,
}}
}
// Send the Request for SQL execution and set the application-level error
// for each result in the reply.
return e.execParsed(session, stmts, pinfo, copyMsgNone)
}
// CopyData adds data to the COPY buffer and executes if there are enough rows.
func (e *Executor) CopyData(session *Session, data string) error {
return e.execRequest(session, data, nil, copyMsgData)
}
// CopyDone executes the buffered COPY data.
func (e *Executor) CopyDone(session *Session) error {
return e.execRequest(session, "", nil, copyMsgDone)
}
// CopyEnd ends the COPY mode. Any buffered data is discarded.
func (s *Session) CopyEnd(ctx context.Context) {
s.copyFrom.Close(ctx)
s.copyFrom = nil
}
// execRequest executes the request in the provided Session.
// It parses the sql into statements, iterates through the statements, creates
// KV transactions and automatically retries them when possible, and executes
// the (synchronous attempt of) schema changes.
// It will accumulate a result in Response for each statement.
// It will resume a SQL transaction, if one was previously open for this client.
//
// execRequest handles the mismatch between the SQL interface that the Executor
// provides, based on statements being streamed from the client in the context
// of a session, and the KV client.Txn interface, based on (possibly-retriable)
// callbacks passed to be executed in the context of a transaction. Actual
// execution of statements in the context of a KV txn is delegated to
// runTxnAttempt().
func (e *Executor) execRequest(
session *Session, sql string, pinfo *parser.PlaceholderInfo, copymsg copyMsg,
) error {
var stmts StatementList
var err error
txnState := &session.TxnState
if log.V(2) || logStatementsExecuteEnabled.Get(&e.cfg.Settings.SV) {
log.Infof(session.Ctx(), "execRequest: %s", sql)
}
session.phaseTimes[sessionStartParse] = timeutil.Now()
if session.copyFrom != nil {
stmts, err = session.ProcessCopyData(session.Ctx(), sql, copymsg)
} else if copymsg != copyMsgNone {
err = fmt.Errorf("unexpected copy command")
} else {
var sl parser.StatementList
sl, err = parser.Parse(sql)
stmts = NewStatementList(sl)
}
session.phaseTimes[sessionEndParse] = timeutil.Now()
if err != nil {
if pgErr, ok := pgerror.GetPGCause(err); ok {
if pgErr.Code == pgerror.CodeFeatureNotSupportedError {
e.recordUnimplementedFeature(pgErr.InternalCommand)
}
}
if log.V(2) || logStatementsExecuteEnabled.Get(&e.cfg.Settings.SV) {
log.Infof(session.Ctx(), "execRequest: error: %v", err)
}
// A parse error occurred: we can't determine if there were multiple
// statements or only one, so just pretend there was one.
if txnState.mu.txn != nil {
// Rollback the txn.
err = txnState.updateStateAndCleanupOnErr(err, e)
}
return err
}
return e.execParsed(session, stmts, pinfo, copymsg)
}
// execParsed executes a batch of statements received as a unit from the client
// and returns query execution errors and communication errors.
func (e *Executor) execParsed(
session *Session, stmts StatementList, pinfo *parser.PlaceholderInfo, copymsg copyMsg,
) error {
var avoidCachedDescriptors bool
txnState := &session.TxnState
resultWriter := session.ResultsWriter
if len(stmts) == 0 {
resultWriter.SetEmptyQuery()
return nil
}
for len(stmts) > 0 {
// Each iteration consumes a transaction's worth of statements. Any error
// that is encountered resets stmts.
inTxn := txnState.State() != NoTxn
// Figure out the statements out of which we're going to try to consume
// this iteration. If we need to create an implicit txn, only one statement
// can be consumed.
stmtsToExec := stmts
// If protoTS is set, the transaction proto sets its Orig and Max timestamps
// to it each retry.
var protoTS *hlc.Timestamp
// autoCommit will be set if we're now starting an implicit transaction and
// thus should automatically commit after we've run the statement.
autoCommit := false
// If we're not in a transaction, then the next statement is part of a new
// transaction (implicit txn or explicit txn). We do the corresponding state
// reset.
if !inTxn {
// Detect implicit transactions - they need to be autocommitted.
if _, isBegin := stmts[0].AST.(*parser.BeginTransaction); !isBegin {
autoCommit = true
stmtsToExec = stmtsToExec[:1]
// Check for AS OF SYSTEM TIME. If it is present but not detected here,
// it will raise an error later on.
var err error
protoTS, err = isAsOf(session, stmtsToExec[0].AST, e.cfg.Clock.Now())
if err != nil {
return err
}
if protoTS != nil {
// When running AS OF SYSTEM TIME queries, we want to use the
// table descriptors from the specified time, and never lease
// anything. To do this, we pass down the avoidCachedDescriptors
// flag and set the transaction's timestamp to the specified time.
avoidCachedDescriptors = true
}
}
txnState.resetForNewSQLTxn(
e, session,
autoCommit, /* implicitTxn */
false, /* retryIntent */
e.cfg.Clock.PhysicalTime(), /* sqlTimestamp */
session.DefaultIsolationLevel,
roachpb.NormalUserPriority,
)
}
if txnState.State() == NoTxn {
panic("we failed to initialize a txn")
}
var err error
var remainingStmts StatementList
var transitionToOpen bool
remainingStmts, transitionToOpen, err = runWithAutoRetry(
e, session, stmtsToExec, !inTxn /* txnPrefix */, autoCommit,
protoTS, pinfo, avoidCachedDescriptors,
)
if autoCommit && txnState.State() != NoTxn {
log.Fatalf(session.Ctx(), "after an implicit txn, state should always be NoTxn, but found: %s",
txnState.State())
}
// If we've been told that we should move to Open, do it now.
if err == nil && (txnState.State() == AutoRetry) && transitionToOpen {
txnState.SetState(Open)
}
if err != nil && (log.V(2) || logStatementsExecuteEnabled.Get(&e.cfg.Settings.SV)) {
log.Infof(session.Ctx(), "execParsed: error: %v. state: %s", err, txnState.State())
}
// Sanity check about not leaving KV txns open on errors (other than
// retriable errors).
if err != nil && txnState.mu.txn != nil && !txnState.mu.txn.IsFinalized() {
if _, retryable := err.(*roachpb.HandledRetryableTxnError); !retryable {
log.Fatalf(session.Ctx(), "got a non-retryable error but the KV "+
"transaction is not finalized. TxnState: %s, err: %s\n"+
"err:%+v\n\ntxn: %s", txnState.State(), err, err, txnState.mu.txn.Proto())
}
}
if txnState.commitSeen && txnState.State() == Aborted {
// A COMMIT got an error (retryable or not); we'll move to state NoTxn.
// After we return a result for COMMIT (with the COMMIT pgwire tag), the
// user can't send any more commands.
txnState.resetStateAndTxn(NoTxn)
}
// If we're no longer in a transaction, close the transaction-scoped
// resources.
if txnState.State() == NoTxn {
txnState.finishSQLTxn(session)
}
// Verify that the metadata callback fails, if one was set. This is
// the precondition for validating that we need a gossip update for
// the callback to eventually succeed. Note that we are careful to
// check this just once per metadata callback (setting the callback
// clears session.verifyFnCheckedOnce).
if e.cfg.TestingKnobs.WaitForGossipUpdate {
// Turn off test verification of metadata changes made by the
// transaction if an error is seen during a transaction.
if err != nil {
session.testingVerifyMetadataFn = nil
}
if fn := session.testingVerifyMetadataFn; fn != nil && !session.verifyFnCheckedOnce {
if fn(e.systemConfig) == nil {
panic(fmt.Sprintf(
"expected %q (or the statements before them) to require a "+
"gossip update, but they did not", stmts))
}
session.verifyFnCheckedOnce = true
}
}
// If the txn is not in an "open" state any more, exec the schema changes.
// They'll short-circuit themselves if the mutation that queued them has
// been rolled back from the table descriptor.
if !txnState.TxnIsOpen() {
// Verify that metadata callback eventually succeeds, if one was
// set.
if e.cfg.TestingKnobs.WaitForGossipUpdate {
if fn := session.testingVerifyMetadataFn; fn != nil {
if !session.verifyFnCheckedOnce {
panic("initial state of the condition to verify was not checked")
}
for fn(e.systemConfig) != nil {
e.systemConfigCond.Wait()
}
session.testingVerifyMetadataFn = nil
}
}
// Release any leases the transaction(s) may have used.
session.tables.releaseTables(session.Ctx())
// Exec the schema changers (if the txn rolled back, the schema changers
// will short-circuit because the corresponding descriptor mutation is not
// found).
if err := txnState.schemaChangers.execSchemaChanges(session.Ctx(), e, session); err != nil {
return err
}
}
// Figure out what statements to run on the next iteration.
if err != nil {
return convertToErrWithPGCode(err)
} else if autoCommit {
stmts = stmts[1:]
} else {
stmts = remainingStmts
}
}
return nil
}
// runWithAutoRetry runs a prefix of stmtsToExec corresponding to the current
// transaction. It deals with auto-retries: when possible, the statements are
// retried in case of retriable errors. It also deals with "autoCommit" - if the
// current transaction only consists of a single statement (i.e. it's an
// "implicit txn"), then this function deal with committing the transaction and
// possibly retrying it if the commit gets a retriable error.
//
// Args:
// stmtsToExec: A prefix of these will be executed. The remaining ones will be
// returned as remainingStmts.
// txnPrefix: Set if stmtsToExec corresponds to the start of the current
// transaction. Used to trap nested BEGINs.
// autoCommit: If set, the transaction will be committed after running the
// statement. If set, stmtsToExec can only contain a single statement.
// If set, the transaction state will always be NoTxn when this function
// returns, regardless of errors.
// Errors encountered when committing are reported to the caller and are
// indistinguishable from errors encountered while running the query.
// protoTS: If not nil, the transaction proto sets its Orig and Max timestamps
// to it each retry.
//
// Returns:
// remainingStmts: all the statements that were not executed.
// transitionToOpen: specifies if the caller should move from state AutoRetry to
// state Open. This will be false if the state is not AutoRetry when this
// returns.
// err: An error that occurred while executing the queries.
func runWithAutoRetry(
e *Executor,
session *Session,
stmtsToExec StatementList,
txnPrefix bool,
autoCommit bool,
protoTS *hlc.Timestamp,
pinfo *parser.PlaceholderInfo,
avoidCachedDescriptors bool,
) (remainingStmts StatementList, transitionToOpen bool, _ error) {
if autoCommit && !txnPrefix {
log.Fatal(session.Ctx(), "autoCommit implies txnPrefix. "+
"How could the transaction have been started before an implicit txn?")
}
if autoCommit && len(stmtsToExec) != 1 {
log.Fatal(session.Ctx(), "Only one statement should be executed when "+
"autoCommit is set. stmtsToExec: %s", stmtsToExec)
}
txnState := &session.TxnState
origState := txnState.State()
// Whether or not we can do auto-retries depends on the state before we
// execute statements in the batch.
//
// TODO(andrei): It's unfortunate that we're keeping track of what the state
// was before running the statements. It'd be great if the state in which we
// find ourselves after running the statements told us if we can auto-retry.
// A way to do that is to introduce another state called AutoRestartWait,
// similar to RestartWait. We'd enter this state whenever we're in state
// AutoRetry and we get a retriable error.
txnCanBeAutoRetried := txnState.State() == AutoRetry
if autoCommit && !txnCanBeAutoRetried {
log.Fatalf(session.Ctx(), "autoCommit is only supported in the AutoRetry state. "+
"Current state: %s", txnState.State())
}