Skip to content

Commit

Permalink
Merge pull request #7695 from planetscale/fix-flush-tables
Browse files Browse the repository at this point in the history
fix flush statement planner
  • Loading branch information
harshit-gangal committed Mar 16, 2021
2 parents deaf818 + 2caf3d5 commit 83662d7
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 39 deletions.
74 changes: 35 additions & 39 deletions go/vt/vtgate/planbuilder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,16 @@ func buildFlushPlan(stmt *sqlparser.Flush, vschema ContextVSchema) (engine.Primi
}

func buildFlushOptions(stmt *sqlparser.Flush, vschema ContextVSchema) (engine.Primitive, error) {
destination, keyspace, _, err := vschema.TargetDestination("")
dest, keyspace, _, err := vschema.TargetDestination("")
if err != nil {
return nil, err
}
if dest == nil {
dest = key.DestinationAllShards{}
}
return &engine.Send{
Keyspace: keyspace,
TargetDestination: destination,
TargetDestination: dest,
Query: sqlparser.String(stmt),
IsDML: false,
SingleShardOnly: false,
Expand All @@ -284,64 +287,50 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema ContextVSchema) (engine.Pri
dest key.Destination
}

flushStatements := map[sendDest]*sqlparser.Flush{}
dest := vschema.Destination()
if dest == nil {
dest = key.DestinationAllShards{}
}

tablesMap := make(map[sendDest]sqlparser.TableNames)
var keys []sendDest
for i, tab := range stmt.TableNames {
var destinationTab key.Destination
var keyspaceTab *vindexes.Keyspace
var ksTab *vindexes.Keyspace
var table *vindexes.Table
var err error
table, _, _, _, destinationTab, err = vschema.FindTableOrVindex(tab)

table, _, _, _, _, err = vschema.FindTableOrVindex(tab)
if err != nil {
return nil, err
}
if table == nil {
return nil, vindexes.NotFoundError{TableName: tab.Name.String()}
}
keyspaceTab = table.Keyspace

ksTab = table.Keyspace
stmt.TableNames[i] = sqlparser.TableName{
Name: table.Name,
}
if destinationTab == nil {
destinationTab = key.DestinationAllShards{}
}

flush, isAvail := flushStatements[sendDest{keyspaceTab, destinationTab}]
if isAvail {
flush.TableNames = append(flush.TableNames, stmt.TableNames[i])
} else {
flush = &sqlparser.Flush{
IsLocal: stmt.IsLocal,
FlushOptions: nil,
TableNames: sqlparser.TableNames{stmt.TableNames[i]},
WithLock: stmt.WithLock,
ForExport: stmt.ForExport,
}
key := sendDest{ksTab, dest}
tables, isAvail := tablesMap[key]
if !isAvail {
keys = append(keys, key)
}
flushStatements[sendDest{keyspaceTab, destinationTab}] = flush
tables = append(tables, stmt.TableNames[i]) // = append(tables.TableNames, stmt.TableNames[i])
tablesMap[key] = tables
}

if len(flushStatements) == 1 {
for sendDest, flush := range flushStatements {
if len(tablesMap) == 1 {
for sendDest, tables := range tablesMap {
return &engine.Send{
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(flush),
IsDML: false,
SingleShardOnly: false,
Query: sqlparser.String(newFlushStmt(stmt, tables)),
}, nil
}
}

keys := make([]sendDest, len(flushStatements))

// Collect keys of the map
i := 0
for k := range flushStatements {
keys[i] = k
i++
}

sort.Slice(keys, func(i, j int) bool {
return keys[i].ks.Name < keys[j].ks.Name
})
Expand All @@ -353,12 +342,19 @@ func buildFlushTables(stmt *sqlparser.Flush, vschema ContextVSchema) (engine.Pri
plan := &engine.Send{
Keyspace: sendDest.ks,
TargetDestination: sendDest.dest,
Query: sqlparser.String(flushStatements[sendDest]),
IsDML: false,
SingleShardOnly: false,
Query: sqlparser.String(newFlushStmt(stmt, tablesMap[sendDest])),
}
finalPlan.Sources = append(finalPlan.Sources, plan)
}

return finalPlan, nil
}

func newFlushStmt(stmt *sqlparser.Flush, tables sqlparser.TableNames) *sqlparser.Flush {
return &sqlparser.Flush{
IsLocal: stmt.IsLocal,
TableNames: tables,
WithLock: stmt.WithLock,
ForExport: stmt.ForExport,
}
}
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/flush_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"Name": "main",
"Sharded": false
},
"TargetDestination": "AllShards()",
"IsDML": false,
"Query": "flush local tables with read lock",
"SingleShardOnly": false
Expand All @@ -44,6 +45,7 @@
"Name": "main",
"Sharded": false
},
"TargetDestination": "AllShards()",
"IsDML": false,
"Query": "flush local hosts, logs",
"SingleShardOnly": false
Expand Down

0 comments on commit 83662d7

Please sign in to comment.