Skip to content

Commit

Permalink
feat: verify fk management requirement for update columns
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Aug 10, 2023
1 parent a30b19e commit c4d3409
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 40 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func fkManagementNotRequired(vschema plancontext.VSchema, vTables []*vindexes.Ta
if ksMode != vschemapb.Keyspace_FK_MANAGED {
continue
}
childFks := vTable.ChildFKsNeedsHandling()
childFks := vTable.ChildFKsNeedsHandling(vindexes.DeleteAction)
if len(childFks) > 0 {
return false
}
Expand Down
20 changes: 19 additions & 1 deletion go/vt/vtgate/planbuilder/operators/ast2op.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,24 @@ func createOperatorFromUpdate(ctx *plancontext.PlanningContext, updStmt *sqlpars
Routing: routing,
}

ksMode, err := ctx.VSchema.ForeignKeyMode(vindexTable.Keyspace.Name)
if err != nil {
return nil, err
}
if ksMode == vschemapb.Keyspace_FK_MANAGED {
childFks := vindexTable.ChildFKsNeedsHandling(vindexes.UpdateAction)
if len(childFks) > 0 {
// Check if any column in the parent table is being updated which has a child foreign key.
for _, updateExpr := range updStmt.Exprs {
for _, childFk := range childFks {
if childFk.ParentColumns.FindColumn(updateExpr.Name.Name) >= 0 {
return nil, vterrors.VT12003()
}
}
}
}
}

subq, err := createSubqueryFromStatement(ctx, updStmt)
if err != nil {
return nil, err
Expand Down Expand Up @@ -197,7 +215,7 @@ func createOperatorFromDelete(ctx *plancontext.PlanningContext, deleteStmt *sqlp
return nil, err
}
if ksMode == vschemapb.Keyspace_FK_MANAGED {
childFks := vindexTable.ChildFKsNeedsHandling()
childFks := vindexTable.ChildFKsNeedsHandling(vindexes.DeleteAction)
if len(childFks) > 0 {
return nil, vterrors.VT12003()
}
Expand Down
57 changes: 32 additions & 25 deletions go/vt/vtgate/planbuilder/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,43 @@ func TestPlan(t *testing.T) {

// TestForeignKeyPlanning tests the planning of foreign keys in a managed mode by Vitess.
func TestForeignKeyPlanning(t *testing.T) {
vschemaWrapper := &vschemaWrapper{
v: loadSchema(t, "vschemas/schema.json", true),
// Set the keyspace with foreign keys enabled as the default.
keyspace: &vindexes.Keyspace{
Name: "user_fk_allow",
Sharded: true,
},
}
vschema := loadSchema(t, "vschemas/schema.json", true)
setFks(t, vschema)

vschemaWrapper := &vschemaWrapper{v: vschema}
testOutputTempDir := makeTestOutput(t)

testFile(t, "foreignkey_cases.json", testOutputTempDir, vschemaWrapper, false)
}

func setFks(t *testing.T, vschema *vindexes.VSchema) {
if vschema.Keyspaces["sharded_fk_allow"] != nil {
// FK from multicol_tbl2 referencing multicol_tbl1 that is shard scoped.
_ = vschema.AddForeignKey("sharded_fk_allow", "multicol_tbl2", createFkDefinition([]string{"colb", "cola", "x", "colc", "y"}, "multicol_tbl1", []string{"colb", "cola", "y", "colc", "x"}, sqlparser.Cascade, sqlparser.Cascade))
// FK from tbl2 referencing tbl1 that is shard scoped.
_ = vschema.AddForeignKey("sharded_fk_allow", "tbl2", createFkDefinition([]string{"col2"}, "tbl1", []string{"col1"}, sqlparser.Restrict, sqlparser.Restrict))
// FK from tbl3 referencing tbl1 that is not shard scoped.
_ = vschema.AddForeignKey("sharded_fk_allow", "tbl3", createFkDefinition([]string{"coly"}, "tbl1", []string{"col1"}, sqlparser.DefaultAction, sqlparser.DefaultAction))
// FK from tbl4 referencing tbl5 that is shard scoped.
_ = vschema.AddForeignKey("sharded_fk_allow", "tbl4", createFkDefinition([]string{"col4"}, "tbl5", []string{"col5"}, sqlparser.SetNull, sqlparser.SetNull))
// FK from tbl6 referencing tbl7 that is shard scoped.
_ = vschema.AddForeignKey("sharded_fk_allow", "tbl6", createFkDefinition([]string{"col6"}, "tbl7", []string{"col7"}, sqlparser.NoAction, sqlparser.NoAction))
}
if vschema.Keyspaces["unsharded_fk_allow"] != nil {
// u_tbl2(col2) -> u_tbl1(col1) Cascade.
// u_tbl3(col2) -> u_tbl2(col2) Cascade Null.
// u_tbl4(col41) -> u_tbl1(col14) Restrict.
// u_tbl4(col4) -> u_tbl3(col3) Restrict.
// u_tbl6(col6) -> u_tbl5(col5) Restrict.

_ = vschema.AddForeignKey("unsharded_fk_allow", "u_tbl2", createFkDefinition([]string{"col2"}, "u_tbl1", []string{"col1"}, sqlparser.Cascade, sqlparser.Cascade))
_ = vschema.AddForeignKey("unsharded_fk_allow", "u_tbl3", createFkDefinition([]string{"col3"}, "u_tbl2", []string{"col2"}, sqlparser.SetNull, sqlparser.SetNull))
_ = vschema.AddForeignKey("unsharded_fk_allow", "u_tbl4", createFkDefinition([]string{"col41"}, "u_tbl1", []string{"col14"}, sqlparser.NoAction, sqlparser.NoAction))
_ = vschema.AddForeignKey("unsharded_fk_allow", "u_tbl4", createFkDefinition([]string{"col4"}, "u_tbl3", []string{"col3"}, sqlparser.Restrict, sqlparser.Restrict))
_ = vschema.AddForeignKey("unsharded_fk_allow", "u_tbl6", createFkDefinition([]string{"col6"}, "u_tbl5", []string{"col5"}, sqlparser.DefaultAction, sqlparser.DefaultAction))
}
}

func TestSystemTables57(t *testing.T) {
// first we move everything to use 5.7 logic
oldVer := servenv.MySQLServerVersion()
Expand Down Expand Up @@ -439,23 +463,6 @@ func loadSchema(t testing.TB, filename string, setCollation bool) *vindexes.VSch
}
}
}
if vschema.Keyspaces["user_fk_allow"] != nil {
// FK from multicol_tbl2 referencing multicol_tbl1 that is shard scoped.
err = vschema.AddForeignKey("user_fk_allow", "multicol_tbl2", createFkDefinition([]string{"colb", "cola", "x", "colc", "y"}, "multicol_tbl1", []string{"colb", "cola", "y", "colc", "x"}, sqlparser.Cascade, sqlparser.Cascade))
require.NoError(t, err)
// FK from tbl2 referencing tbl1 that is shard scoped.
err = vschema.AddForeignKey("user_fk_allow", "tbl2", createFkDefinition([]string{"col2"}, "tbl1", []string{"col1"}, sqlparser.Restrict, sqlparser.Restrict))
require.NoError(t, err)
// FK from tbl3 referencing tbl1 that is not shard scoped.
err = vschema.AddForeignKey("user_fk_allow", "tbl3", createFkDefinition([]string{"coly"}, "tbl1", []string{"col1"}, sqlparser.DefaultAction, sqlparser.DefaultAction))
require.NoError(t, err)
// FK from tbl4 referencing tbl5 that is shard scoped.
err = vschema.AddForeignKey("user_fk_allow", "tbl4", createFkDefinition([]string{"col4"}, "tbl5", []string{"col5"}, sqlparser.SetNull, sqlparser.SetNull))
require.NoError(t, err)
// FK from tbl6 referencing tbl7 that is shard scoped.
err = vschema.AddForeignKey("user_fk_allow", "tbl6", createFkDefinition([]string{"col6"}, "tbl7", []string{"col7"}, sqlparser.NoAction, sqlparser.NoAction))
require.NoError(t, err)
}
return vschema
}

Expand Down
61 changes: 55 additions & 6 deletions go/vt/vtgate/planbuilder/testdata/foreignkey_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"OperatorType": "Insert",
"Variant": "Sharded",
"Keyspace": {
"Name": "user_fk_allow",
"Name": "sharded_fk_allow",
"Sharded": true
},
"TargetTabletType": "PRIMARY",
Expand All @@ -25,7 +25,7 @@
}
},
"TablesUsed": [
"user_fk_allow.tbl2"
"sharded_fk_allow.tbl2"
]
}
},
Expand All @@ -39,7 +39,7 @@
"OperatorType": "Insert",
"Variant": "Sharded",
"Keyspace": {
"Name": "user_fk_allow",
"Name": "sharded_fk_allow",
"Sharded": true
},
"TargetTabletType": "PRIMARY",
Expand All @@ -50,7 +50,7 @@
}
},
"TablesUsed": [
"user_fk_allow.multicol_tbl2"
"sharded_fk_allow.multicol_tbl2"
]
}
},
Expand All @@ -69,15 +69,15 @@
"OperatorType": "Delete",
"Variant": "Scatter",
"Keyspace": {
"Name": "user_fk_allow",
"Name": "sharded_fk_allow",
"Sharded": true
},
"TargetTabletType": "PRIMARY",
"Query": "delete from tbl7",
"Table": "tbl7"
},
"TablesUsed": [
"user_fk_allow.tbl7"
"sharded_fk_allow.tbl7"
]
}
},
Expand All @@ -90,5 +90,54 @@
"comment": "Delete in a table with shard-scoped foreign keys with cascade disallowed",
"query": "delete from tbl5",
"plan": "VT12002: unsupported: foreign keys management at vitess"
},
{
"comment": "update in unsharded table with restrict",
"query": "update u_tbl5 set col5 = 'foo' where id = 1",
"plan": {
"QueryType": "UPDATE",
"Original": "update u_tbl5 set col5 = 'foo' where id = 1",
"Instructions": {
"OperatorType": "Update",
"Variant": "Unsharded",
"Keyspace": {
"Name": "unsharded_fk_allow",
"Sharded": false
},
"TargetTabletType": "PRIMARY",
"Query": "update u_tbl5 set col5 = 'foo' where id = 1",
"Table": "u_tbl5"
},
"TablesUsed": [
"unsharded_fk_allow.u_tbl5"
]
}
},
{
"comment": "update in unsharded table with cascade",
"query": "update u_tbl2 set col2 = 'bar' where id = 1",
"plan": "VT12002: unsupported: foreign keys management at vitess"
},
{
"comment": "update in unsharded table with cascade - on non-referenced column",
"query": "update u_tbl2 set col_no_ref = 'baz' where id = 1",
"plan": {
"QueryType": "UPDATE",
"Original": "update u_tbl2 set col_no_ref = 'baz' where id = 1",
"Instructions": {
"OperatorType": "Update",
"Variant": "Unsharded",
"Keyspace": {
"Name": "unsharded_fk_allow",
"Sharded": false
},
"TargetTabletType": "PRIMARY",
"Query": "update u_tbl2 set col_no_ref = 'baz' where id = 1",
"Table": "u_tbl2"
},
"TablesUsed": [
"unsharded_fk_allow.u_tbl2"
]
}
}
]
13 changes: 12 additions & 1 deletion go/vt/vtgate/planbuilder/testdata/vschemas/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@
}
}
},
"user_fk_allow": {
"sharded_fk_allow": {
"sharded": true,
"foreignKeyMode": "FK_MANAGED",
"vindexes": {
Expand Down Expand Up @@ -696,6 +696,17 @@
]
}
}
},
"unsharded_fk_allow": {
"foreignKeyMode": "FK_MANAGED",
"tables": {
"u_tbl1": {},
"u_tbl2": {},
"u_tbl3": {},
"u_tbl4": {},
"u_tbl5": {},
"u_tbl6": {}
}
}
}
}
47 changes: 44 additions & 3 deletions go/vt/vtgate/planbuilder/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package planbuilder

import (
querypb "vitess.io/vitess/go/vt/proto/query"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
Expand Down Expand Up @@ -54,9 +55,11 @@ func gen4UpdateStmtPlanner(
}

if ks, tables := semTable.SingleUnshardedKeyspace(); ks != nil {
plan := updateUnshardedShortcut(updStmt, ks, tables)
plan = pushCommentDirectivesOnPlan(plan, updStmt)
return newPlanResult(plan.Primitive(), operators.QualifiedTables(ks, tables)...), nil
if fkManagementNotRequiredForUpdate(semTable, vschema, tables, updStmt.Exprs) {
plan := updateUnshardedShortcut(updStmt, ks, tables)
plan = pushCommentDirectivesOnPlan(plan, updStmt)
return newPlanResult(plan.Primitive(), operators.QualifiedTables(ks, tables)...), nil
}
}

if semTable.NotUnshardedErr != nil {
Expand Down Expand Up @@ -91,6 +94,44 @@ func gen4UpdateStmtPlanner(
return newPlanResult(plan.Primitive(), operators.TablesUsed(op)...), nil
}

func fkManagementNotRequiredForUpdate(semTable *semantics.SemTable, vschema plancontext.VSchema, vTables []*vindexes.Table, updateExprs sqlparser.UpdateExprs) bool {
childFkMap := make(map[string][]vindexes.ChildFKInfo)

// Find the foreign key mode and check for any managed child foreign keys.
for _, vTable := range vTables {
ksMode, err := vschema.ForeignKeyMode(vTable.Keyspace.Name)
if err != nil {
return false
}
if ksMode != vschemapb.Keyspace_FK_MANAGED {
continue
}
childFks := vTable.ChildFKsNeedsHandling(vindexes.UpdateAction)
if len(childFks) > 0 {
childFkMap[vTable.String()] = childFks
}
}

// Check if any column in the parent table is being updated which has a child foreign key.
for _, updateExpr := range updateExprs {
tblInfo, err := semTable.TableInfoForExpr(updateExpr.Name)
if err != nil {
continue
}
vTable := tblInfo.GetVindexTable()
childFks, exists := childFkMap[vTable.String()]
if !exists {
continue
}
for _, childFk := range childFks {
if childFk.ParentColumns.FindColumn(updateExpr.Name.Name) >= 0 {
return false
}
}
}
return true
}

func updateUnshardedShortcut(stmt *sqlparser.Update, ks *vindexes.Keyspace, tables []*vindexes.Table) logicalPlan {
edml := engine.NewDML()
edml.Keyspace = ks
Expand Down
7 changes: 5 additions & 2 deletions go/vt/vtgate/vindexes/foreign_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (t *Table) CrossShardParentFKs() (crossShardFKs []ParentFKInfo) {

// ChildFKsNeedsHandling retuns the child foreign keys that needs to be handled by the vtgate.
// This can be either the foreign key is not shard scoped or the child tables needs cascading.
func (t *Table) ChildFKsNeedsHandling() (fks []ChildFKInfo) {
func (t *Table) ChildFKsNeedsHandling(getAction func(fk ChildFKInfo) sqlparser.ReferenceAction) (fks []ChildFKInfo) {
for _, fk := range t.ChildForeignKeys {
// If the keyspaces are different, then the fk definition
// is going to go across shards.
Expand All @@ -143,7 +143,7 @@ func (t *Table) ChildFKsNeedsHandling() (fks []ChildFKInfo) {
continue
}
// If the action is not Restrict, then it needs a cascade.
switch fk.OnDelete {
switch getAction(fk) {
case sqlparser.Cascade, sqlparser.SetNull, sqlparser.SetDefault:
fks = append(fks, fk)
continue
Expand All @@ -159,6 +159,9 @@ func (t *Table) ChildFKsNeedsHandling() (fks []ChildFKInfo) {
return
}

func UpdateAction(fk ChildFKInfo) sqlparser.ReferenceAction { return fk.OnUpdate }
func DeleteAction(fk ChildFKInfo) sqlparser.ReferenceAction { return fk.OnDelete }

func isShardScoped(pTable *Table, cTable *Table, pCols sqlparser.Columns, cCols sqlparser.Columns) bool {
if !pTable.Keyspace.Sharded {
return true
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/vindexes/foreign_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,10 @@ func TestChildFKs(t *testing.T) {
},
expChildTbls: []string{"t1"},
}}
deleteAction := func(fk ChildFKInfo) sqlparser.ReferenceAction { return fk.OnDelete }
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
childFks := tt.table.ChildFKsNeedsHandling()
childFks := tt.table.ChildFKsNeedsHandling(deleteAction)
var actualChildTbls []string
for _, fk := range childFks {
actualChildTbls = append(actualChildTbls, fk.Table.Name.String())
Expand Down

0 comments on commit c4d3409

Please sign in to comment.