Skip to content

Commit

Permalink
WiP updates
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 14, 2024
1 parent c000f07 commit 2901dab
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 40 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/vreplication/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ create table nopk (name varchar(128), age int unsigned);
],
"auto_increment": {
"column": "cid",
"sequence": "customer_seq"
"sequence": "` + "`customer_seq`" + `"
}
},
"orders": {
Expand Down Expand Up @@ -345,7 +345,7 @@ create table nopk (name varchar(128), age int unsigned);
],
"auto_increment": {
"column": "cid",
"sequence": "customer_seq"
"sequence": "` + "`customer_seq`" + `"
}
},
"orders": {
Expand Down
80 changes: 63 additions & 17 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,8 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
for tableName, tableDef := range kvs.Tables {
// The table name can be escaped in the vschema definition.
if tableName, err = sqlescape.UnescapeID(tableName); err != nil {
return err
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name in keyspace %s: %v",
keyspace, err)
}
select {
case <-sctx.Done():
Expand Down Expand Up @@ -1434,10 +1435,10 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
searchGroup, gctx := errgroup.WithContext(ctx)
searchCompleted := make(chan struct{})
for _, keyspace := range keyspaces {
// The keyspace name can be escaped in the vschema definition.
// The keyspace name could be escaped so we need to unescape it.
keyspace, err := sqlescape.UnescapeID(keyspace)
if err != nil {
return nil, err
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid keyspace name %s: %v", keyspace, err)
}
searchGroup.Go(func() error {
return searchKeyspace(gctx, searchCompleted, keyspace)
Expand Down Expand Up @@ -1469,13 +1470,22 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa
targetDBName := targets[0].GetPrimary().DbName()
sequencesByBackingTable := make(map[string]*sequenceMetadata)

for _, table := range ts.Tables() {
for _, table := range ts.tables {
vs, ok := vschema.Tables[table]
if !ok {
// Try the escaped table name as it can be escaped in the vschema.
vs, ok = vschema.Tables[sqlescape.EscapeID(table)]
}
if !ok || vs.GetAutoIncrement().GetSequence() == "" {
continue
}
var err error
// Be sure that the table name is now unescaped.
table, err = sqlescape.UnescapeID(table)
if err != nil {
return nil, false, err
}
sm := &sequenceMetadata{
backingTableName: vs.AutoIncrement.Sequence,
usingTableName: table,
usingTableDefinition: vs,
usingTableDBName: targetDBName,
Expand All @@ -1490,20 +1500,25 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa
}
// Unescape the table name and keyspace name as they may be escaped in the
// vschema definition if they e.g. contain dashes.
var err error
if tableName, err = sqlescape.UnescapeID(tableName); err != nil {
return nil, false, err
}
if keyspace, err = sqlescape.UnescapeID(keyspace); err != nil {
return nil, false, err
return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid keyspace in qualified sequence table name %s defined in the %s keyspace",
vs.AutoIncrement.Sequence, ts.targetKeyspace)
}
if tableName, err = sqlescape.UnescapeID(tableName); err != nil {
return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid qualified sequence table name %s defined in the %s keyspace",
vs.AutoIncrement.Sequence, ts.targetKeyspace)
}
sm.backingTableName = tableName
sm.backingTableKeyspace = keyspace
sm.backingTableName = tableName
// Set the default keyspace name. We will later check to
// see if the tablet we send requests to is using a dbname
// override and use that if it is.
sm.backingTableDBName = "vt_" + keyspace
} else {
sm.backingTableName, err = sqlescape.UnescapeID(vs.AutoIncrement.Sequence)
if err != nil {
return nil, false, err
}
allFullyQualified = false
}
sequencesByBackingTable[sm.backingTableName] = sm
Expand Down Expand Up @@ -1534,10 +1549,25 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for target shard %s/%s",
ts.targetKeyspace, target.GetShard().ShardName())
}
usingCol, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableDefinition.AutoIncrement.Column)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid column name %s specified for sequence table %s: %v",
sequenceMetadata.usingTableDefinition.AutoIncrement.Column, sequenceMetadata.usingTableName, err)
}
usingDB, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableDBName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s for sequence table %s: %v",
sequenceMetadata.usingTableDBName, sequenceMetadata.usingTableName, err)
}
usingTable, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence table name %s: %v",
sequenceMetadata.usingTableName, err)
}
query := sqlparser.BuildParsedQuery(sqlGetMaxSequenceVal,
sqlescape.EscapeID(sequenceMetadata.usingTableDefinition.AutoIncrement.Column),
sqlescape.EscapeID(sequenceMetadata.usingTableDBName),
sqlescape.EscapeID(sequenceMetadata.usingTableName),
usingCol,
usingDB,
usingTable,
)
qr, terr := ts.ws.tmc.ExecuteFetchAsApp(ictx, primary.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{
Query: []byte(query.Query),
Expand Down Expand Up @@ -1598,9 +1628,19 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen
if sequenceTablet.DbNameOverride != "" {
sequenceMetadata.backingTableDBName = sequenceTablet.DbNameOverride
}
backingDB, err := sqlescape.EnsureEscaped(sequenceMetadata.backingTableDBName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s in sequence backing table %s: %v",
sequenceMetadata.backingTableDBName, sequenceMetadata.backingTableName, err)
}
backingTable, err := sqlescape.EnsureEscaped(sequenceMetadata.backingTableName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence backing table name %s: %v",
sequenceMetadata.backingTableName, err)
}
query := sqlparser.BuildParsedQuery(sqlInitSequenceTable,
sqlescape.EscapeID(sequenceMetadata.backingTableDBName),
sqlescape.EscapeID(sequenceMetadata.backingTableName),
backingDB,
backingTable,
nextVal,
nextVal,
nextVal,
Expand Down Expand Up @@ -1633,7 +1673,13 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get primary tablet for keyspace %s: %v",
sequenceMetadata.backingTableKeyspace, ierr)
}
ierr = ts.TabletManagerClient().ResetSequences(ictx, ti.Tablet, []string{sequenceMetadata.backingTableName})
// ResetSequences interfaces with the schema engine and the actual
// table identifiers DO NOT contain the backticks. So we have to
// ensure that the table name is unescaped.
if backingTable, err = sqlescape.UnescapeID(backingTable); err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %s: %v", backingTable, err)
}
ierr = ts.TabletManagerClient().ResetSequences(ictx, ti.Tablet, []string{backingTable})
if ierr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to reset the sequence cache for backing table %s on shard %s/%s using tablet %s: %v",
sequenceMetadata.backingTableName, sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias, ierr)
Expand Down
73 changes: 53 additions & 20 deletions go/vt/vtctl/workflow/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func TestGetTargetSequenceMetadata(t *testing.T) {
defer cancel()
cell := "cell1"
workflow := "wf1"
table := "t1"
table := "`t1`"
unescapedTable := "t1"
sourceKeyspace := &testKeyspace{
KeyspaceName: "source-ks",
ShardNames: []string{"0"},
Expand All @@ -82,26 +83,9 @@ func TestGetTargetSequenceMetadata(t *testing.T) {
"xxhash": {
Type: "xxhash",
},
"unicode_loose_xxhash": {
Type: "unicode_loose_xxhash",
},
}
env := newTestEnv(t, ctx, cell, sourceKeyspace, targetKeyspace)
defer env.close()
/*
env.tmc.schema = map[string]*tabletmanagerdatapb.SchemaDefinition{
"t1": {
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: "t1",
Columns: []string{
"my-col",
},
},
},
},
}
*/

type testCase struct {
name string
Expand All @@ -116,7 +100,7 @@ func TestGetTargetSequenceMetadata(t *testing.T) {
want: nil,
},
{
name: "sequences with backticks all over",
name: "sequences with backticks and qualified table",
sourceVSchema: &vschema.Keyspace{
Vindexes: vindexes,
Tables: map[string]*vschema.Table{
Expand Down Expand Up @@ -147,7 +131,7 @@ func TestGetTargetSequenceMetadata(t *testing.T) {
backingTableName: "my-seq1",
backingTableKeyspace: "source-ks",
backingTableDBName: "vt_source-ks",
usingTableName: table,
usingTableName: unescapedTable,
usingTableDBName: "vt_targetks",
usingTableDefinition: &vschema.Table{
ColumnVindexes: []*vschema.ColumnVindex{
Expand All @@ -164,6 +148,55 @@ func TestGetTargetSequenceMetadata(t *testing.T) {
},
},
},
{
name: "sequences with backticks",
sourceVSchema: &vschema.Keyspace{
Vindexes: vindexes,
Tables: map[string]*vschema.Table{
"`my-seq1`": {
Type: "sequence",
},
},
},
targetVSchema: &vschema.Keyspace{
Vindexes: vindexes,
Tables: map[string]*vschema.Table{
table: {
ColumnVindexes: []*vschema.ColumnVindex{
{
Name: "xxhash",
Column: "`my-col`",
},
},
AutoIncrement: &vschema.AutoIncrement{
Column: "`my-col`",
Sequence: "`my-seq1`",
},
},
},
},
want: map[string]*sequenceMetadata{
"my-seq1": {
backingTableName: "my-seq1",
backingTableKeyspace: "source-ks",
backingTableDBName: "vt_source-ks",
usingTableName: unescapedTable,
usingTableDBName: "vt_targetks",
usingTableDefinition: &vschema.Table{
ColumnVindexes: []*vschema.ColumnVindex{
{
Column: "`my-col`",
Name: "xxhash",
},
},
AutoIncrement: &vschema.AutoIncrement{
Column: "`my-col`",
Sequence: "`my-seq1`",
},
},
},
},
},
}

for _, tc := range tests {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ func (se *Engine) ResetSequences(tables []string) error {
for _, tableName := range tables {
if table, ok := se.tables[tableName]; ok {
if table.SequenceInfo != nil {
log.Infof("Resetting sequence info for table %v: %s", tableName, table.SequenceInfo)
log.Infof("Resetting sequence info for table %s: %+v", tableName, table.SequenceInfo)
table.SequenceInfo.Reset()
}
} else {
Expand Down

0 comments on commit 2901dab

Please sign in to comment.