Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gen4: Add UPDATE planning #9871

Merged
merged 35 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c3d45ab
gen4: analyze simple update query
harshit-gangal Mar 11, 2022
0cd46fc
feat: addition of gen4UpdateStmtPlanner in the gen4_planner (wip) [ci…
frouioui Apr 5, 2022
977a1df
feat: support sqlparser.Statement in the semantic analyzer (wip) [ci …
frouioui Apr 5, 2022
3c40a1e
feat: support unsharded update queries in gen4 without subqueries (wi…
frouioui Apr 5, 2022
946085f
feat: support sharded update queries in gen4 (wip) [ci skip]
frouioui Apr 5, 2022
e956085
feat: support routed tables for update queries in gen4 (wip) [ci skip]
frouioui Apr 5, 2022
30215c3
feat: analyzer nows support subquery in update [ci skip]
frouioui Apr 6, 2022
13673f6
feat: rewrite routed tables everywhere, not just on the updated table
systay Apr 6, 2022
c97f166
test: updated test expectations to match the new error message
systay Apr 6, 2022
765f155
feat: add support for update in the logical operator structure
systay Apr 6, 2022
29d8854
feat: add update to the physical operator list
systay Apr 6, 2022
148c3e3
feat: produce a fuller physical operator for updates
systay Apr 7, 2022
4504582
feat: represent UPDATE as a physical operator
systay Apr 8, 2022
63b1db2
feat: handle single sharded update queries
systay Apr 8, 2022
1bba4d6
feat: consider all vindexes for routing
systay Apr 8, 2022
50a81b9
feat: support Update's MultiShardAutocommit and QueryTimeout in Gen4
frouioui Apr 8, 2022
84d9e78
feat: begin supporting subqueries in update
frouioui Apr 8, 2022
a0213ec
feat: fix subquery support in gen4's update for sharded subquery
frouioui Apr 8, 2022
cfad564
feat: fix expected output for multicol vindex update
frouioui Apr 8, 2022
cd7b2ff
feat: merge subqueries with update with literal values
systay Apr 8, 2022
ee7d40a
feat: make it possible to merge operators other than SELECT
systay Apr 10, 2022
9931b09
Merge branch main into gen4-update
systay Apr 10, 2022
d80a4c7
chore: small fixes
systay Apr 11, 2022
088bab6
refactor: clean up file
systay Apr 11, 2022
92c693f
feat: add locking for the read part of updates with subqueries
systay Apr 11, 2022
66ce4b3
refactor: clean up operator transformers
systay Apr 11, 2022
ce3d73c
test: update tests now that update goes through semantic analysis
systay Apr 11, 2022
8d863b7
feat: handle extracted subqueries when planning update evalengine exp…
systay Apr 11, 2022
ae8a27f
test: turn off Gen4 vs V3 comparisons on everything except SELECT and…
systay Apr 12, 2022
2c49426
Merge branch main into gen4-update
systay Apr 12, 2022
d9ae23c
feat: allow explitly routed UPDATE queries
systay Apr 12, 2022
4b42831
chore: update proto definitions
systay Apr 12, 2022
498ea25
chore: addressed review comments
systay Apr 13, 2022
42e2f59
feat: fail on UPDATE with derived table with the proper error
systay Apr 13, 2022
9c7c8ed
Merge branch main into gen4-update
systay Apr 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion go/test/endtoend/vtgate/queries/subquery/subquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,20 @@ func TestSubqueryInINClause(t *testing.T) {
mcmp, closer := start(t)
defer closer()

defer mcmp.Exec(`delete from t1`)
mcmp.Exec("insert into t1(id1, id2) values(0,0),(1,1)")
mcmp.AssertMatches(`SELECT id2 FROM t1 WHERE id1 IN (SELECT 1 FROM dual)`, `[[INT64(1)]]`)
}

func TestSubqueryInUpdate(t *testing.T) {
mcmp, closer := start(t)
defer closer()

conn := mcmp.VtConn

utils.Exec(t, conn, `insert into t1(id1, id2) values (1, 10), (2, 20), (3, 30), (4, 40), (5, 50)`)
utils.Exec(t, conn, `insert into t2(id3, id4) values (1, 3), (2, 4)`)
utils.AssertMatches(t, conn, `SELECT id2, keyspace_id FROM t1_id2_idx WHERE id2 IN (2,10)`, `[[INT64(10) VARBINARY("\x16k@\xb4J\xbaK\xd6")]]`)
utils.Exec(t, conn, `update /*vt+ PLANNER=gen4 */ t1 set id2 = (select count(*) from t2) where id1 = 1`)
utils.AssertMatches(t, conn, `SELECT id2 FROM t1 WHERE id1 = 1`, `[[INT64(2)]]`)
utils.AssertMatches(t, conn, `SELECT id2, keyspace_id FROM t1_id2_idx WHERE id2 IN (2,10)`, `[[INT64(2) VARBINARY("\x16k@\xb4J\xbaK\xd6")]]`)
}
3 changes: 3 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,9 @@ func (c *ParsedComments) Prepend(comment string) Comments {
// IsSet checks the directive map for the named directive and returns
// true if the directive is set and has a true/false or 0/1 value
func (d CommentDirectives) IsSet(key string) bool {
if d == nil {
return false
}
val, ok := d[key]
if !ok {
return false
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtexplain/testdata/multi-output/updatesharded-output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,19 @@ update user set name='alicia' where name='alice'
8 ks_sharded/c0-: commit

----------------------------------------------------------------------
update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname=1 where nickname != ''
update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info='apa' where name != 'hog'

1 ks_sharded/-40: begin
1 ks_sharded/-40: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname = 1 where nickname != '' limit 10001
1 ks_sharded/-40: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info = 'apa' where `name` != 'hog' limit 10001
1 ks_sharded/-40: commit
1 ks_sharded/40-80: begin
1 ks_sharded/40-80: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname = 1 where nickname != '' limit 10001
1 ks_sharded/40-80: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info = 'apa' where `name` != 'hog' limit 10001
1 ks_sharded/40-80: commit
1 ks_sharded/80-c0: begin
1 ks_sharded/80-c0: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname = 1 where nickname != '' limit 10001
1 ks_sharded/80-c0: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info = 'apa' where `name` != 'hog' limit 10001
1 ks_sharded/80-c0: commit
1 ks_sharded/c0-: begin
1 ks_sharded/c0-: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname = 1 where nickname != '' limit 10001
1 ks_sharded/c0-: update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info = 'apa' where `name` != 'hog' limit 10001
1 ks_sharded/c0-: commit

----------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/testdata/updatesharded-queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ update user set name='alicia' where name='alice';
/* update name_info set has_nickname=1 where nickname != ''; */

/* scatter update autocommit */
update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set has_nickname=1 where nickname != '';
update /*vt+ MULTI_SHARD_AUTOCOMMIT=1 */ name_info set info='apa' where name != 'hog';

/* multi-shard update by secondary vindex */
update user set pet='rover' where name='alice';
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type RoutingParameters struct {

// TargetDestination specifies an explicit target destination to send the query to.
// This will bypass the routing logic.
TargetDestination key.Destination
TargetDestination key.Destination // update `user[-]@replica`.user set ....

// Vindex specifies the vindex to be used.
Vindex vindexes.Vindex
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (upd *Update) TryExecute(vcursor VCursor, bindVars map[string]*querypb.Bind
switch upd.Opcode {
case Unsharded:
return upd.execUnsharded(vcursor, bindVars, rss)
case Equal, IN, Scatter, ByDestination:
case Equal, EqualUnique, IN, Scatter, ByDestination:
return upd.execMultiDestination(vcursor, bindVars, rss, upd.updateVindexEntries)
default:
// Unreachable.
Expand Down
34 changes: 34 additions & 0 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,40 @@ func TestUpdateEqual(t *testing.T) {
assertQueries(t, sbclookup, wantQueries)
}

func TestUpdateFromSubQuery(t *testing.T) {
executor, sbc1, sbc2, _ := createExecutorEnv()

logChan := QueryLogger.Subscribe("Test")
defer QueryLogger.Unsubscribe(logChan)

fields := []*querypb.Field{
{Name: "count(*)", Type: sqltypes.Int64},
}
sbc2.SetResults([]*sqltypes.Result{{
Fields: fields,
Rows: [][]sqltypes.Value{{
sqltypes.NewInt64(4),
}},
}})

// Update by primary vindex, but first execute subquery
_, err := executorExec(executor, "update user set a=(select count(*) from user where id = 3) where id = 1", nil)
require.NoError(t, err)
wantQueriesSbc1 := []*querypb.BoundQuery{{
Sql: "update `user` set a = :__sq1 where id = 1",
BindVariables: map[string]*querypb.BindVariable{
"__sq1": sqltypes.Int64BindVariable(4),
},
}}
wantQueriesSbc2 := []*querypb.BoundQuery{{
Sql: "select count(*) from `user` where id = 3 lock in share mode",
systay marked this conversation as resolved.
Show resolved Hide resolved
BindVariables: map[string]*querypb.BindVariable{},
}}
assertQueries(t, sbc1, wantQueriesSbc1)
assertQueries(t, sbc2, wantQueriesSbc2)
testQueryLog(t, logChan, "TestExecute", "UPDATE", "update user set a=(select count(*) from user where id = 3) where id = 1", 2)
}

func TestUpdateEqualWithWriteOnlyLookupUniqueVindex(t *testing.T) {
res := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|wo_lu_col|lu_col|t2_lu_vdx", "int64|int64|int64|int64"),
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtgate/executor_framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ func init() {
}

func createExecutorEnv() (executor *Executor, sbc1, sbc2, sbclookup *sandboxconn.SandboxConn) {
// Use legacy gateway until we can rewrite these tests to use new tabletgateway
cell := "aa"
hc := discovery.NewFakeHealthCheck(nil)
s := createSandbox("TestExecutor")
Expand Down
113 changes: 92 additions & 21 deletions go/vt/vtgate/planbuilder/abstract/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/semantics"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

type (
Expand Down Expand Up @@ -56,6 +57,12 @@ type (
// Clone creates a copy of the operator that can be updated without changing the original
Clone() PhysicalOperator
}

// IntroducesTable is used to make it possible to gather information about the table an operator introduces
IntroducesTable interface {
GetQTable() *QueryTable
GetVTable() *vindexes.Table
}
)

func getOperatorFromTableExpr(tableExpr sqlparser.TableExpr, semTable *semantics.SemTable) (LogicalOperator, error) {
Expand Down Expand Up @@ -83,7 +90,7 @@ func getOperatorFromTableExpr(tableExpr sqlparser.TableExpr, semTable *semantics
qg.Tables = append(qg.Tables, qt)
return qg, nil
case *sqlparser.DerivedTable:
inner, err := CreateOperatorFromAST(tbl.Select, semTable)
inner, err := CreateLogicalOperatorFromAST(tbl.Select, semTable)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,13 +165,15 @@ func getSelect(s sqlparser.SelectStatement) *sqlparser.Select {
}
}

// CreateOperatorFromAST creates an operator tree that represents the input SELECT or UNION query
func CreateOperatorFromAST(selStmt sqlparser.SelectStatement, semTable *semantics.SemTable) (op LogicalOperator, err error) {
// CreateLogicalOperatorFromAST creates an operator tree that represents the input SELECT or UNION query
func CreateLogicalOperatorFromAST(selStmt sqlparser.Statement, semTable *semantics.SemTable) (op LogicalOperator, err error) {
switch node := selStmt.(type) {
case *sqlparser.Select:
op, err = createOperatorFromSelect(node, semTable)
case *sqlparser.Union:
op, err = createOperatorFromUnion(node, semTable)
case *sqlparser.Update:
op, err = createOperatorFromUpdate(node, semTable)
default:
err = vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "%T: operator not yet supported", selStmt)
}
Expand All @@ -175,7 +184,7 @@ func CreateOperatorFromAST(selStmt sqlparser.SelectStatement, semTable *semantic
}

func createOperatorFromUnion(node *sqlparser.Union, semTable *semantics.SemTable) (LogicalOperator, error) {
opLHS, err := CreateOperatorFromAST(node.Left, semTable)
opLHS, err := CreateLogicalOperatorFromAST(node.Left, semTable)
if err != nil {
return nil, err
}
Expand All @@ -184,7 +193,7 @@ func createOperatorFromUnion(node *sqlparser.Union, semTable *semantics.SemTable
if isRHSUnion {
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "nesting of unions at the right-hand side is not yet supported")
}
opRHS, err := CreateOperatorFromAST(node.Right, semTable)
opRHS, err := CreateLogicalOperatorFromAST(node.Right, semTable)
if err != nil {
return nil, err
}
Expand All @@ -199,19 +208,9 @@ func createOperatorFromUnion(node *sqlparser.Union, semTable *semantics.SemTable

// createOperatorFromSelect creates an operator tree that represents the input SELECT query
func createOperatorFromSelect(sel *sqlparser.Select, semTable *semantics.SemTable) (LogicalOperator, error) {
var resultantOp *SubQuery
if len(semTable.SubqueryMap[sel]) > 0 {
resultantOp = &SubQuery{}
for _, sq := range semTable.SubqueryMap[sel] {
opInner, err := CreateOperatorFromAST(sq.Subquery.Select, semTable)
if err != nil {
return nil, err
}
resultantOp.Inner = append(resultantOp.Inner, &SubQueryInner{
ExtractedSubquery: sq,
Inner: opInner,
})
}
subq, err := createSubqueryFromStatement(sel, semTable)
if err != nil {
return nil, err
}
op, err := crossJoin(sel.From, semTable)
if err != nil {
Expand All @@ -227,11 +226,83 @@ func createOperatorFromSelect(sel *sqlparser.Select, semTable *semantics.SemTabl
addColumnEquality(semTable, expr)
}
}
if resultantOp == nil {
if subq == nil {
return op, nil
}
resultantOp.Outer = op
return resultantOp, nil
subq.Outer = op
return subq, nil
}

func createOperatorFromUpdate(updStmt *sqlparser.Update, semTable *semantics.SemTable) (LogicalOperator, error) {
subq, err := createSubqueryFromStatement(updStmt, semTable)
if err != nil {
return nil, err
}
alTbl, ok := updStmt.TableExprs[0].(*sqlparser.AliasedTableExpr)
if !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected AliasedTableExpr")
}
tblName, ok := alTbl.Expr.(sqlparser.TableName)
if !ok {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected TableName")
}

tableID := semTable.TableSetFor(alTbl)
tableInfo, err := semTable.TableInfoFor(tableID)
if err != nil {
return nil, err
}

if tableInfo.IsInfSchema() {
return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "can't update information schema tables")
}

var predicates []sqlparser.Expr
if updStmt.Where != nil {
predicates = sqlparser.SplitAndExpression(nil, updStmt.Where.Expr)
}
qt := &QueryTable{
ID: tableID,
Alias: alTbl,
Table: tblName,
Predicates: predicates,
IsInfSchema: false,
}

assignments := make(map[string]sqlparser.Expr)
for _, set := range updStmt.Exprs {
assignments[set.Name.Name.String()] = set.Expr
}

u := &Update{
Table: qt,
Assignments: assignments,
AST: updStmt,
TableInfo: tableInfo,
}
if subq == nil {
return u, nil
}
subq.Outer = u
return subq, nil
}

func createSubqueryFromStatement(stmt sqlparser.Statement, semTable *semantics.SemTable) (*SubQuery, error) {
if len(semTable.SubqueryMap[stmt]) == 0 {
return nil, nil
}
subq := &SubQuery{}
for _, sq := range semTable.SubqueryMap[stmt] {
opInner, err := CreateLogicalOperatorFromAST(sq.Subquery.Select, semTable)
if err != nil {
return nil, err
}
subq.Inner = append(subq.Inner, &SubQueryInner{
ExtractedSubquery: sq,
Inner: opInner,
})
}
return subq, nil
}

func addColumnEquality(semTable *semantics.SemTable, expr sqlparser.Expr) {
Expand Down
13 changes: 10 additions & 3 deletions go/vt/vtgate/planbuilder/abstract/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ func TestOperator(t *testing.T) {
require.NoError(t, err)
tree, err := sqlparser.Parse(tc.query)
require.NoError(t, err)
stmt := tree.(sqlparser.SelectStatement)
stmt := tree.(sqlparser.Statement)
semTable, err := semantics.Analyze(stmt, "", si)
require.NoError(t, err)
optree, err := CreateOperatorFromAST(stmt, semTable)
optree, err := CreateLogicalOperatorFromAST(stmt, semTable)
require.NoError(t, err)
output := testString(optree)
if tc.expected != output {
Expand Down Expand Up @@ -164,8 +164,15 @@ func testString(op Operator) string {
dist = "(distinct)"
}
return fmt.Sprintf("Concatenate%s {\n%s\n}", dist, strings.Join(inners, ",\n"))
case *Update:
tbl := "table: " + op.Table.testString()
var assignments []string
for name, expr := range op.Assignments {
assignments = append(assignments, fmt.Sprintf("\t%s = %s", name, sqlparser.String(expr)))
}
return fmt.Sprintf("Update {\n\t%s\nassignments:\n%s\n}", tbl, strings.Join(assignments, "\n"))
}
return fmt.Sprintf("implement me: %T", op)
panic(fmt.Sprintf("%T", op))
}

func indent(s string) string {
Expand Down
32 changes: 32 additions & 0 deletions go/vt/vtgate/planbuilder/abstract/operator_test_data.txt
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,35 @@ OuterJoin: {
}
Predicate: t.id = y.t_id
}

update tbl set col1 = apa
Update {
table: TableSet{0}:tbl
assignments:
col1 = apa
}

update tbl set col1 = 'apa', col2 = 1337 where id = 12 and name = 'gangal'
Update {
table: TableSet{0}:tbl where id = 12 and `name` = 'gangal'
assignments:
col1 = 'apa'
col2 = 1337
}

update user set u = 1 where id = (select id from user_extra where id = 42)
SubQuery: {
SubQueries: [
{
Type: PulloutValue
Query: QueryGraph: {
Tables:
TableSet{1}:user_extra where id = 42
}
}]
Outer: Update {
table: TableSet{0}:`user` where id = (select id from user_extra where id = 42)
assignments:
u = 1
}
}
Loading