Skip to content

Commit

Permalink
Merge pull request #7538 from planetscale/sharded-warnings
Browse files Browse the repository at this point in the history
Produce query warnings for using features not supported when sharded
  • Loading branch information
harshit-gangal committed Mar 22, 2021
2 parents 9dfb0c7 + f8d31b4 commit 9fd81c8
Show file tree
Hide file tree
Showing 20 changed files with 177 additions and 71 deletions.
5 changes: 5 additions & 0 deletions go/test/endtoend/vtgate/unsharded/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func TestMain(m *testing.M) {
}

// Start vtgate
clusterInstance.VtGateExtraArgs = []string{"-warn_sharded_only=true"}
if err := clusterInstance.StartVtgate(); err != nil {
log.Fatal(err.Error())
return 1
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestSelectIntoAndLoadFrom(t *testing.T) {
query = `load data infile '` + directory + `x2.txt' replace into table t1 Fields terminated by ';' optionally enclosed by '"' escaped by '\t' lines terminated by '\n'`
exec(t, conn, query)
assertMatches(t, conn, `select c1,c2,c3 from t1`, `[[INT64(300) INT64(100) INT64(300)]]`)
assertMatches(t, conn, "show warnings", `[[VARCHAR("Warning") UINT16(1235) VARCHAR("use of feature that is only supported in unsharded mode: LOAD")]]`)
}

func TestEmptyStatement(t *testing.T) {
Expand Down Expand Up @@ -313,6 +315,8 @@ func TestCallProcedure(t *testing.T) {
qr := exec(t, conn, `CALL sp_insert()`)
require.EqualValues(t, 1, qr.RowsAffected)

assertMatches(t, conn, "show warnings", `[[VARCHAR("Warning") UINT16(1235) VARCHAR("'CALL' not supported in sharded mode")]]`)

_, err = conn.ExecuteFetch(`CALL sp_select()`, 1000, true)
require.Error(t, err)
require.Contains(t, err.Error(), "Multi-Resultset not supported in stored procedure")
Expand Down Expand Up @@ -356,6 +360,7 @@ func TestTempTable(t *testing.T) {
defer conn1.Close()

_ = exec(t, conn1, `create temporary table temp_t(id bigint primary key)`)
assertMatches(t, conn1, "show warnings", `[[VARCHAR("Warning") UINT16(1235) VARCHAR("'temporary table' not supported in sharded mode")]]`)
_ = exec(t, conn1, `insert into temp_t(id) values (1),(2),(3)`)
assertMatches(t, conn1, `select id from temp_t order by id`, `[[INT64(1)] [INT64(2)] [INT64(3)]]`)
assertMatches(t, conn1, `select count(table_id) from information_schema.innodb_temp_table_info`, `[[INT64(1)]]`)
Expand Down
14 changes: 14 additions & 0 deletions go/vt/proto/query/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error {
vtgateSession.TargetString = opts.Target

streamSize := 10
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, streamSize, cache.DefaultConfig)
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig)

return nil
}
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ type (
Original string // Original is the original query.
Instructions Primitive // Instructions contains the instructions needed to fulfil the query.
BindVarNeeds *sqlparser.BindVarNeeds // Stores BindVars needed to be provided as part of expression rewriting
Warnings []*querypb.QueryWarning // Warnings that need to be yielded every time this query runs

ExecCount uint64 // Count of times this plan was executed
ExecTime uint64 // Total execution time
Expand Down
36 changes: 24 additions & 12 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ type Executor struct {

mu sync.Mutex
vschema *vindexes.VSchema
normalize bool
streamSize int
plans cache.Cache
vschemaStats *VSchemaStats

normalize bool
warnShardedOnly bool

vm *VSchemaManager
}

Expand All @@ -114,16 +116,17 @@ const pathScatterStats = "/debug/scatter_stats"
const pathVSchema = "/debug/vschema"

// NewExecutor creates a new Executor.
func NewExecutor(ctx context.Context, serv srvtopo.Server, cell string, resolver *Resolver, normalize bool, streamSize int, cacheCfg *cache.Config) *Executor {
func NewExecutor(ctx context.Context, serv srvtopo.Server, cell string, resolver *Resolver, normalize, warnOnShardedOnly bool, streamSize int, cacheCfg *cache.Config) *Executor {
e := &Executor{
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
streamSize: streamSize,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1035,7 +1038,7 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession
bindVars = make(map[string]*querypb.BindVariable)
}
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly)
vcursor.SetIgnoreMaxMemoryRows(true)
switch stmtType {
case sqlparser.StmtStream:
Expand Down Expand Up @@ -1072,6 +1075,11 @@ func (e *Executor) StreamExecute(ctx context.Context, method string, safeSession
return err
}

// add any warnings that the planner wants to add
for _, warning := range plan.Warnings {
safeSession.RecordWarning(warning)
}

execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)

Expand Down Expand Up @@ -1264,6 +1272,10 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
if err != nil {
return nil, err
}

plan.Warnings = vcursor.warnings
vcursor.warnings = nil

if !skipQueryPlanCache && !sqlparser.SkipQueryPlanCacheDirective(statement) && sqlparser.CachePlan(statement) {
e.plans.Set(planKey, plan)
}
Expand Down Expand Up @@ -1548,7 +1560,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st
func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *LogStats) ([]*querypb.Field, error) {
// V3 mode.
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv)
vcursor, _ := newVCursorImpl(ctx, safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly)
plan, err := e.getPlan(
vcursor,
query,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func createLegacyExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandb
bad.VSchema = badVSchema

getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
executor = NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
return executor, sbc1, sbc2, sbclookup
Expand Down Expand Up @@ -433,7 +433,7 @@ func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn
bad.VSchema = badVSchema

getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
executor = NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
return executor, sbc1, sbc2, sbclookup
Expand All @@ -453,7 +453,7 @@ func createCustomExecutor(vschema string) (executor *Executor, sbc1, sbc2, sbclo
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_MASTER, true, 1, nil)
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema

executor = NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor = NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)
return executor, sbc1, sbc2, sbclookup
}

Expand Down
22 changes: 11 additions & 11 deletions go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func TestSelectScatter(t *testing.T) {
sbc := hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_MASTER, true, 1, nil)
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)
logChan := QueryLogger.Subscribe("Test")
defer QueryLogger.Unsubscribe(logChan)

Expand Down Expand Up @@ -1061,7 +1061,7 @@ func TestSelectScatterPartial(t *testing.T) {
conns = append(conns, sbc)
}

executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)
logChan := QueryLogger.Subscribe("Test")
defer QueryLogger.Unsubscribe(logChan)

Expand Down Expand Up @@ -1118,7 +1118,7 @@ func TestStreamSelectScatter(t *testing.T) {
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_MASTER, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

sql := "select id from user"
result, err := executorStream(executor, sql)
Expand Down Expand Up @@ -1171,7 +1171,7 @@ func TestSelectScatterOrderBy(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col1, col2 from user order by col2 desc"
gotResult, err := executorExec(executor, query, nil)
Expand Down Expand Up @@ -1236,7 +1236,7 @@ func TestSelectScatterOrderByVarChar(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col1, textcol from user order by textcol desc"
gotResult, err := executorExec(executor, query, nil)
Expand Down Expand Up @@ -1296,7 +1296,7 @@ func TestStreamSelectScatterOrderBy(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select id, col from user order by col desc"
gotResult, err := executorStream(executor, query)
Expand Down Expand Up @@ -1353,7 +1353,7 @@ func TestStreamSelectScatterOrderByVarChar(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select id, textcol from user order by textcol desc"
gotResult, err := executorStream(executor, query)
Expand Down Expand Up @@ -1410,7 +1410,7 @@ func TestSelectScatterAggregate(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col, sum(foo) from user group by col"
gotResult, err := executorExec(executor, query, nil)
Expand Down Expand Up @@ -1467,7 +1467,7 @@ func TestStreamSelectScatterAggregate(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col, sum(foo) from user group by col"
gotResult, err := executorStream(executor, query)
Expand Down Expand Up @@ -1525,7 +1525,7 @@ func TestSelectScatterLimit(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col1, col2 from user order by col2 desc limit 3"
gotResult, err := executorExec(executor, query, nil)
Expand Down Expand Up @@ -1591,7 +1591,7 @@ func TestStreamSelectScatterLimit(t *testing.T) {
}})
conns = append(conns, sbc)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

query := "select col1, col2 from user order by col2 desc limit 3"
gotResult, err := executorStream(executor, query)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestStreamSQLSharded(t *testing.T) {
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_MASTER, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, testBufferSize, cache.DefaultConfig)
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig)

sql := "stream * from sharded_user_msgs"
result, err := executorStreamMessages(executor, sql)
Expand Down

0 comments on commit 9fd81c8

Please sign in to comment.