Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Produce query warnings for using features not supported when sharded #7538

Merged
merged 11 commits into from
Mar 22, 2021
5 changes: 5 additions & 0 deletions go/test/endtoend/vtgate/unsharded/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func TestMain(m *testing.M) {
}

// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"-warn_sharded_only=true"}
if err := clusterInstance.StartVtgate(); err != nil {
log.Fatal(err.Error())
return 1
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestSelectIntoAndLoadFrom(t *testing.T) {
query = `load data infile '` + directory + `x2.txt' replace into table t1 Fields terminated by ';' optionally enclosed by '"' escaped by '\t' lines terminated by '\n'`
exec(t, conn, query)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)]]`)
assertMatches(t, conn, "show warnings", `[[VARCHAR("Warning") UINT16(1235) VARCHAR("use of feature that is only supported in unsharded mode: LOAD")]]`)
}

func TestEmptyStatement(t *testing.T) {
Expand Down Expand Up @@ -313,6 +315,8 @@ func TestCallProcedure(t *testing.T) {
qr := exec(t, conn, `CALL sp_insert()`)
require.EqualValues(t, 1, qr.RowsAffected)

assertMatches(t, conn, "show warnings", `[[VARCHAR("Warning") UINT16(1235) VARCHAR("'CALL' not supported in sharded mode")]]`)

_, err = conn.ExecuteFetch(`CALL sp_select()`, 1000, true)
require.Error(t, err)
require.Contains(t, err.Error(), "Multi-Resultset not supported in stored procedure")
Expand Down Expand Up @@ -356,6 +360,7 @@ func TestTempTable(t *testing.T) {
defer conn1.Close()

_ = exec(t, conn1, `create temporary table temp_t(id bigint primary key)`)
assertMatches(t, conn1, "show warnings", `[[VARCHAR("Warning") UINT16(1235) VARCHAR("'temporary table' not supported in sharded mode")]]`)
_ = exec(t, conn1, `insert into temp_t(id) values (1),(2),(3)`)
assertMatches(t, conn1, `select id from temp_t order by id`, `[[INT64(1)] [INT64(2)] [INT64(3)]]`)
assertMatches(t, conn1, `select count(table_id) from information_schema.innodb_temp_table_info`, `[[INT64(1)]]`)
Expand Down
14 changes: 14 additions & 0 deletions go/vt/proto/query/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error {
vtgateSession.TargetString = opts.Target

streamSize := 10
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, streamSize, cache.DefaultConfig)
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig)

return nil
}
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type (
Original string // Original is the original query.
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*querypb.QueryWarning // Warnings that need to be yielded every time this query runs

ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
Expand Down
36 changes: 24 additions & 12 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ type Executor struct {

mu sync.Mutex
vschema *vindexes.VSchema
normalize bool
streamSize int
plans cache.Cache
vschemaStats *VSchemaStats

normalize bool
warnShardedOnly bool

vm *VSchemaManager
}

Expand All @@ -114,16 +116,17 @@ const pathScatterStats = "/debug/scatter_stats"
const pathVSchema = "/debug/vschema"

// NewExecutor creates a new Executor.
func NewExecutor(ctx context.Context, serv srvtopo.Server, cell string, resolver *Resolver, normalize bool, streamSize int, cacheCfg *cache.Config) *Executor {
func NewExecutor(ctx context.Context, serv srvtopo.Server, cell string, resolver *Resolver, normalize, warnOnShardedOnly bool, streamSize int, cacheCfg *cache.Config) *Executor {
e := &Executor{
systay marked this conversation as resolved.
Show resolved Hide resolved
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
streamSize: streamSize,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1035,7 +1038,7 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession
bindVars = make(map[string]*querypb.BindVariable)
}
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly)
vcursor.SetIgnoreMaxMemoryRows(true)
switch stmtType {
case sqlparser.StmtStream:
Expand Down Expand Up @@ -1072,6 +1075,11 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession
return err
}

// add any warnings that the planner wants to add
for _, warning := range plan.Warnings {
safeSession.RecordWarning(warning)
}

execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)

Expand Down Expand Up @@ -1264,6 +1272,10 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
if err != nil {
return nil, err
}

plan.Warnings = vcursor.warnings
vcursor.warnings = nil

if !skipQueryPlanCache && !sqlparser.SkipQueryPlanCacheDirective(statement) && sqlparser.CachePlan(statement) {
e.plans.Set(planKey, plan)
}
Expand Down Expand Up @@ -1548,7 +1560,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st
func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) ([]*querypb.Field, error) {
// V3 mode.
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly)
plan, err := e.getPlan(
vcursor,
query,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func createLegacyExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandb
bad.VSchema = badVSchema

getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
executor = NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
return executor, sbc1, sbc2, sbclookup
Expand Down Expand Up @@ -433,7 +433,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
bad.VSchema = badVSchema

getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
executor = NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
return executor, sbc1, sbc2, sbclookup
Expand All @@ -453,7 +453,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_MASTER, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)
return executor, sbc1, sbc2, sbclookup
}

Expand Down
22 changes: 11 additions & 11 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func TestSelectScatter(t *testing.T) {
sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_MASTER, true, 1, nil)
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)
logChan := QueryLogger.Subscribe("Test")
defer QueryLogger.Unsubscribe(logChan)

Expand Down Expand Up @@ -1061,7 +1061,7 @@ func TestSelectScatterPartial(t *testing.T) {
conns = append(conns, sbc)
}

executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)
logChan := QueryLogger.Subscribe("Test")
defer QueryLogger.Unsubscribe(logChan)

Expand Down Expand Up @@ -1118,7 +1118,7 @@ func TestStreamSelectScatter(t *testing.T) {
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_MASTER, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

sql := "select id from user"
result, err := executorStream(executor, sql)
Expand Down Expand Up @@ -1171,7 +1171,7 @@ func TestSelectScatterOrderBy(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col1, col2 from user order by col2 desc"
gotResult, err := executorExec(executor, query, nil)
Expand Down Expand Up @@ -1236,7 +1236,7 @@ func TestSelectScatterOrderByVarChar(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col1, textcol from user order by textcol desc"
gotResult, err := executorExec(executor, query, nil)
Expand Down Expand Up @@ -1296,7 +1296,7 @@ func TestStreamSelectScatterOrderBy(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select id, col from user order by col desc"
gotResult, err := executorStream(executor, query)
Expand Down Expand Up @@ -1353,7 +1353,7 @@ func TestStreamSelectScatterOrderByVarChar(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select id, textcol from user order by textcol desc"
gotResult, err := executorStream(executor, query)
Expand Down Expand Up @@ -1410,7 +1410,7 @@ func TestSelectScatterAggregate(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col, sum(foo) from user group by col"
gotResult, err := executorExec(executor, query, nil)
Expand Down Expand Up @@ -1467,7 +1467,7 @@ func TestStreamSelectScatterAggregate(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col, sum(foo) from user group by col"
gotResult, err := executorStream(executor, query)
Expand Down Expand Up @@ -1525,7 +1525,7 @@ func TestSelectScatterLimit(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col1, col2 from user order by col2 desc limit 3"
gotResult, err := executorExec(executor, query, nil)
Expand Down Expand Up @@ -1591,7 +1591,7 @@ func TestStreamSelectScatterLimit(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col1, col2 from user order by col2 desc limit 3"
gotResult, err := executorStream(executor, query)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestStreamSQLSharded(t *testing.T) {
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_MASTER, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

sql := "stream * from sharded_user_msgs"
result, err := executorStreamMessages(executor, sql)
Expand Down