Skip to content

Commit

Permalink
Gen4: Add UPDATE planning (#9871)
Browse files Browse the repository at this point in the history
* gen4: analyze simple update query

Signed-off-by: Harshit Gangal <harshit@planetscale.com>

* feat: addition of gen4UpdateStmtPlanner in the gen4_planner (wip) [ci skip]

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: support sqlparser.Statement in the semantic analyzer (wip) [ci skip]

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: support unsharded update queries in gen4 without subqueries (wip) [ci skip]

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: support sharded update queries in gen4 (wip) [ci skip]

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: support routed tables for update queries in gen4 (wip) [ci skip]

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: analyzer nows support subquery in update [ci skip]

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: rewrite routed tables everywhere, not just on the updated table

Signed-off-by: Andres Taylor <andres@planetscale.com>

* test: updated test expectations to match the new error message

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: add support for update in the logical operator structure

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: add update to the physical operator list

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: produce a fuller physical operator for updates

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: represent UPDATE as a physical operator

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: handle single sharded update queries

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: consider all vindexes for routing

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: support Update's MultiShardAutocommit and QueryTimeout in Gen4

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: begin supporting subqueries in update

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: fix subquery support in gen4's update for sharded subquery

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: fix expected output for multicol vindex update

Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>

* feat: merge subqueries with update with literal values

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: make it possible to merge operators other than SELECT

Signed-off-by: Andres Taylor <andres@planetscale.com>

* chore: small fixes

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor: clean up file

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: add locking for the read part of updates with subqueries

Signed-off-by: Andres Taylor <andres@planetscale.com>

* refactor: clean up operator transformers

Signed-off-by: Andres Taylor <andres@planetscale.com>

* test: update tests now that update goes through semantic analysis

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: handle extracted subqueries when planning update evalengine expressions

Signed-off-by: Andres Taylor <andres@planetscale.com>

* test: turn off Gen4 vs V3 comparisons on everything except SELECT and UNION

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: allow explitly routed UPDATE queries

Signed-off-by: Andres Taylor <andres@planetscale.com>

* chore: update proto definitions

Signed-off-by: Andres Taylor <andres@planetscale.com>

* chore: addressed review comments

Signed-off-by: Andres Taylor <andres@planetscale.com>

* feat: fail on UPDATE with derived table with the proper error

Signed-off-by: Andres Taylor <andres@planetscale.com>

Co-authored-by: Florent Poinsard <florent.poinsard@outlook.fr>
Co-authored-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
3 people authored Apr 13, 2022
1 parent f328ae6 commit 085db75
Show file tree
Hide file tree
Showing 44 changed files with 2,138 additions and 225 deletions.
15 changes: 14 additions & 1 deletion go/test/endtoend/vtgate/queries/subquery/subquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,20 @@ func TestSubqueryInINClause(t *testing.T) {
mcmp, closer := start(t)
defer closer()

defer mcmp.Exec(`delete from t1`)
mcmp.Exec("insert into t1(id1, id2) values(0,0),(1,1)")
mcmp.AssertMatches(`SELECT id2 FROM t1 WHERE id1 IN (SELECT 1 FROM dual)`, `[[INT64(1)]]`)
}

func TestSubqueryInUpdate(t *testing.T) {
mcmp, closer := start(t)
defer closer()

conn := mcmp.VtConn

utils.Exec(t, conn, `insert into t1(id1, id2) values (1, 10), (2, 20), (3, 30), (4, 40), (5, 50)`)
utils.Exec(t, conn, `insert into t2(id3, id4) values (1, 3), (2, 4)`)
utils.AssertMatches(t, conn, `SELECT id2, keyspace_id FROM t1_id2_idx WHERE id2 IN (2,10)`, `[[INT64(10) VARBINARY("\x16k@\xb4J\xbaK\xd6")]]`)
utils.Exec(t, conn, `update /*vt+ PLANNER=gen4 */ t1 set id2 = (select count(*) from t2) where id1 = 1`)
utils.AssertMatches(t, conn, `SELECT id2 FROM t1 WHERE id1 = 1`, `[[INT64(2)]]`)
utils.AssertMatches(t, conn, `SELECT id2, keyspace_id FROM t1_id2_idx WHERE id2 IN (2,10)`, `[[INT64(2) VARBINARY("\x16k@\xb4J\xbaK\xd6")]]`)
}
3 changes: 3 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ func (c *ParsedComments) Prepend(comment string) Comments {
// IsSet checks the directive map for the named directive and returns
// true if the directive is set and has a true/false or 0/1 value
func (d CommentDirectives) IsSet(key string) bool {
if d == nil {
return false
}
val, ok := d[key]
if !ok {
return false
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtexplain/testdata/multi-output/updatesharded-output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ update user set name='alicia' where name='alice'
8 ks_sharded/c0-: commit

----------------------------------------------------------------------
update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname=1 where nickname != ''
update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info='apa' where name != 'hog'

1 ks_sharded/-40: begin
1 ks_sharded/-40: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname = 1 where nickname != '' limit 10001
1 ks_sharded/-40: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info = 'apa' where `name` != 'hog' limit 10001
1 ks_sharded/-40: commit
1 ks_sharded/40-80: begin
1 ks_sharded/40-80: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname = 1 where nickname != '' limit 10001
1 ks_sharded/40-80: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info = 'apa' where `name` != 'hog' limit 10001
1 ks_sharded/40-80: commit
1 ks_sharded/80-c0: begin
1 ks_sharded/80-c0: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname = 1 where nickname != '' limit 10001
1 ks_sharded/80-c0: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info = 'apa' where `name` != 'hog' limit 10001
1 ks_sharded/80-c0: commit
1 ks_sharded/c0-: begin
1 ks_sharded/c0-: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname = 1 where nickname != '' limit 10001
1 ks_sharded/c0-: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info = 'apa' where `name` != 'hog' limit 10001
1 ks_sharded/c0-: commit

----------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/testdata/updatesharded-queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ update user set name='alicia' where name='alice';
/* update name_info set has_nickname=1 where nickname != ''; */

/* scatter update autocommit */
update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname=1 where nickname != '';
update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info='apa' where name != 'hog';

/* multi-shard update by secondary vindex */
update user set pet='rover' where name='alice';
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type RoutingParameters struct {

// TargetDestination specifies an explicit target destination to send the query to.
// This will bypass the routing logic.
TargetDestination key.Destination
TargetDestination key.Destination // update `user[-]@replica`.user set ....

// Vindex specifies the vindex to be used.
Vindex vindexes.Vindex
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (upd *Update) TryExecute(vcursor VCursor, bindVars map[string]*querypb.Bind
switch upd.Opcode {
case Unsharded:
return upd.execUnsharded(vcursor, bindVars, rss)
case Equal, IN, Scatter, ByDestination:
case Equal, EqualUnique, IN, Scatter, ByDestination:
return upd.execMultiDestination(vcursor, bindVars, rss, upd.updateVindexEntries)
default:
// Unreachable.
Expand Down
34 changes: 34 additions & 0 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,40 @@ func TestUpdateEqual(t *testing.T) {
assertQueries(t, sbclookup, wantQueries)
}

func TestUpdateFromSubQuery(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

logChan := QueryLogger.Subscribe("Test")
defer QueryLogger.Unsubscribe(logChan)

fields := []*querypb.Field{
{Name: "count(*)", Type: sqltypes.Int64},
}
sbc2.SetResults([]*sqltypes.Result{{
Fields: fields,
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(4),
}},
}})

// Update by primary vindex, but first execute subquery
_, err := executorExec(executor, "update user set a=(select count(*) from user where id = 3) where id = 1", nil)
require.NoError(t, err)
wantQueriesSbc1 := []*querypb.BoundQuery{{
Sql: "update `user` set a = :__sq1 where id = 1",
BindVariables: map[string]*querypb.BindVariable{
"__sq1": sqltypes.Int64BindVariable(4),
},
}}
wantQueriesSbc2 := []*querypb.BoundQuery{{
Sql: "select count(*) from `user` where id = 3 lock in share mode",
BindVariables: map[string]*querypb.BindVariable{},
}}
assertQueries(t, sbc1, wantQueriesSbc1)
assertQueries(t, sbc2, wantQueriesSbc2)
testQueryLog(t, logChan, "TestExecute", "UPDATE", "update user set a=(select count(*) from user where id = 3) where id = 1", 2)
}

func TestUpdateEqualWithWriteOnlyLookupUniqueVindex(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|lu_col|t2_lu_vdx", "int64|int64|int64|int64"),
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ func init() {
}

func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn) {
// Use legacy gateway until we can rewrite these tests to use new tabletgateway
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)
s := createSandbox("TestExecutor")
Expand Down
113 changes: 92 additions & 21 deletions go/vt/vtgate/planbuilder/abstract/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/semantics"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

type (
Expand Down Expand Up @@ -56,6 +57,12 @@ type (
// Clone creates a copy of the operator that can be updated without changing the original
Clone() PhysicalOperator
}

// IntroducesTable is used to make it possible to gather information about the table an operator introduces
IntroducesTable interface {
GetQTable() *QueryTable
GetVTable() *vindexes.Table
}
)

func getOperatorFromTableExpr(tableExpr sqlparser.TableExpr, semTable *semantics.SemTable) (LogicalOperator, error) {
Expand Down Expand Up @@ -83,7 +90,7 @@ func getOperatorFromTableExpr(tableExpr sqlparser.TableExpr, semTable *semantics
qg.Tables = append(qg.Tables, qt)
return qg, nil
case *sqlparser.DerivedTable:
inner, err := CreateOperatorFromAST(tbl.Select, semTable)
inner, err := CreateLogicalOperatorFromAST(tbl.Select, semTable)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,13 +165,15 @@ func getSelect(s sqlparser.SelectStatement) *sqlparser.Select {
}
}

// CreateOperatorFromAST creates an operator tree that represents the input SELECT or UNION query
func CreateOperatorFromAST(selStmt sqlparser.SelectStatement, semTable *semantics.SemTable) (op LogicalOperator, err error) {
// CreateLogicalOperatorFromAST creates an operator tree that represents the input SELECT or UNION query
func CreateLogicalOperatorFromAST(selStmt sqlparser.Statement, semTable *semantics.SemTable) (op LogicalOperator, err error) {
switch node := selStmt.(type) {
case *sqlparser.Select:
op, err = createOperatorFromSelect(node, semTable)
case *sqlparser.Union:
op, err = createOperatorFromUnion(node, semTable)
case *sqlparser.Update:
op, err = createOperatorFromUpdate(node, semTable)
default:
err = vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "%T: operator not yet supported", selStmt)
}
Expand All @@ -175,7 +184,7 @@ func CreateOperatorFromAST(selStmt sqlparser.SelectStatement, semTable *semantic
}

func createOperatorFromUnion(node *sqlparser.Union, semTable *semantics.SemTable) (LogicalOperator, error) {
opLHS, err := CreateOperatorFromAST(node.Left, semTable)
opLHS, err := CreateLogicalOperatorFromAST(node.Left, semTable)
if err != nil {
return nil, err
}
Expand All @@ -184,7 +193,7 @@ func createOperatorFromUnion(node *sqlparser.Union, semTable *semantics.SemTable
if isRHSUnion {
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "nesting of unions at the right-hand side is not yet supported")
}
opRHS, err := CreateOperatorFromAST(node.Right, semTable)
opRHS, err := CreateLogicalOperatorFromAST(node.Right, semTable)
if err != nil {
return nil, err
}
Expand All @@ -199,19 +208,9 @@ func createOperatorFromUnion(node *sqlparser.Union, semTable *semantics.SemTable

// createOperatorFromSelect creates an operator tree that represents the input SELECT query
func createOperatorFromSelect(sel *sqlparser.Select, semTable *semantics.SemTable) (LogicalOperator, error) {
var resultantOp *SubQuery
if len(semTable.SubqueryMap[sel]) > 0 {
resultantOp = &SubQuery{}
for _, sq := range semTable.SubqueryMap[sel] {
opInner, err := CreateOperatorFromAST(sq.Subquery.Select, semTable)
if err != nil {
return nil, err
}
resultantOp.Inner = append(resultantOp.Inner, &SubQueryInner{
ExtractedSubquery: sq,
Inner: opInner,
})
}
subq, err := createSubqueryFromStatement(sel, semTable)
if err != nil {
return nil, err
}
op, err := crossJoin(sel.From, semTable)
if err != nil {
Expand All @@ -227,11 +226,83 @@ func createOperatorFromSelect(sel *sqlparser.Select, semTable *semantics.SemTabl
addColumnEquality(semTable, expr)
}
}
if resultantOp == nil {
if subq == nil {
return op, nil
}
resultantOp.Outer = op
return resultantOp, nil
subq.Outer = op
return subq, nil
}

func createOperatorFromUpdate(updStmt *sqlparser.Update, semTable *semantics.SemTable) (LogicalOperator, error) {
subq, err := createSubqueryFromStatement(updStmt, semTable)
if err != nil {
return nil, err
}
alTbl, ok := updStmt.TableExprs[0].(*sqlparser.AliasedTableExpr)
if !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected AliasedTableExpr")
}
tblName, ok := alTbl.Expr.(sqlparser.TableName)
if !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected TableName")
}

tableID := semTable.TableSetFor(alTbl)
tableInfo, err := semTable.TableInfoFor(tableID)
if err != nil {
return nil, err
}

if tableInfo.IsInfSchema() {
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "can't update information schema tables")
}

var predicates []sqlparser.Expr
if updStmt.Where != nil {
predicates = sqlparser.SplitAndExpression(nil, updStmt.Where.Expr)
}
qt := &QueryTable{
ID: tableID,
Alias: alTbl,
Table: tblName,
Predicates: predicates,
IsInfSchema: false,
}

assignments := make(map[string]sqlparser.Expr)
for _, set := range updStmt.Exprs {
assignments[set.Name.Name.String()] = set.Expr
}

u := &Update{
Table: qt,
Assignments: assignments,
AST: updStmt,
TableInfo: tableInfo,
}
if subq == nil {
return u, nil
}
subq.Outer = u
return subq, nil
}

func createSubqueryFromStatement(stmt sqlparser.Statement, semTable *semantics.SemTable) (*SubQuery, error) {
if len(semTable.SubqueryMap[stmt]) == 0 {
return nil, nil
}
subq := &SubQuery{}
for _, sq := range semTable.SubqueryMap[stmt] {
opInner, err := CreateLogicalOperatorFromAST(sq.Subquery.Select, semTable)
if err != nil {
return nil, err
}
subq.Inner = append(subq.Inner, &SubQueryInner{
ExtractedSubquery: sq,
Inner: opInner,
})
}
return subq, nil
}

func addColumnEquality(semTable *semantics.SemTable, expr sqlparser.Expr) {
Expand Down
13 changes: 10 additions & 3 deletions go/vt/vtgate/planbuilder/abstract/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ func TestOperator(t *testing.T) {
require.NoError(t, err)
tree, err := sqlparser.Parse(tc.query)
require.NoError(t, err)
stmt := tree.(sqlparser.SelectStatement)
stmt := tree.(sqlparser.Statement)
semTable, err := semantics.Analyze(stmt, "", si)
require.NoError(t, err)
optree, err := CreateOperatorFromAST(stmt, semTable)
optree, err := CreateLogicalOperatorFromAST(stmt, semTable)
require.NoError(t, err)
output := testString(optree)
if tc.expected != output {
Expand Down Expand Up @@ -164,8 +164,15 @@ func testString(op Operator) string {
dist = "(distinct)"
}
return fmt.Sprintf("Concatenate%s {\n%s\n}", dist, strings.Join(inners, ",\n"))
case *Update:
tbl := "table: " + op.Table.testString()
var assignments []string
for name, expr := range op.Assignments {
assignments = append(assignments, fmt.Sprintf("\t%s = %s", name, sqlparser.String(expr)))
}
return fmt.Sprintf("Update {\n\t%s\nassignments:\n%s\n}", tbl, strings.Join(assignments, "\n"))
}
return fmt.Sprintf("implement me: %T", op)
panic(fmt.Sprintf("%T", op))
}

func indent(s string) string {
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtgate/planbuilder/abstract/operator_test_data.txt
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,35 @@ OuterJoin: {
}
Predicate: t.id = y.t_id
}

update tbl set col1 = apa
Update {
table: TableSet{0}:tbl
assignments:
col1 = apa
}

update tbl set col1 = 'apa', col2 = 1337 where id = 12 and name = 'gangal'
Update {
table: TableSet{0}:tbl where id = 12 and `name` = 'gangal'
assignments:
col1 = 'apa'
col2 = 1337
}

update user set u = 1 where id = (select id from user_extra where id = 42)
SubQuery: {
SubQueries: [
{
Type: PulloutValue
Query: QueryGraph: {
Tables:
TableSet{1}:user_extra where id = 42
}
}]
Outer: Update {
table: TableSet{0}:`user` where id = (select id from user_extra where id = 42)
assignments:
u = 1
}
}
Loading

0 comments on commit 085db75

Please sign in to comment.