Skip to content

Commit

Permalink
Minor changes after self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed May 13, 2024
1 parent efd373f commit 8282f55
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 24 deletions.
9 changes: 5 additions & 4 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
)

const appendEntry = -1
const (
appendEntry = -1
useQuery = "use `fakesqldb`"
)

// DB is a fake database and all its methods are thread safe. It
// creates a mysql.Listener and implements the mysql.Handler
Expand Down Expand Up @@ -201,7 +204,7 @@ func New(t testing.TB) *DB {
db.listener.Accept()
}()

db.AddQuery("use `fakesqldb`", &sqltypes.Result{})
db.AddQuery(useQuery, &sqltypes.Result{})
// Return the db.
return db
}
Expand Down Expand Up @@ -628,9 +631,7 @@ func (db *DB) DeleteAllQueries() {
clear(db.patternData)
clear(db.queryCalled)
// Use is always expected to be present.
useQuery := "use `fakesqldb`"
db.data[useQuery] = &ExpectedResult{&sqltypes.Result{}, nil}
db.queryCalled[useQuery] = 0
}

// AddRejectedQuery adds a query which will be rejected at execution time.
Expand Down
2 changes: 0 additions & 2 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,6 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')")
}
waitForRowCount(t, vtgateConn, "product", "customer2", 3+num+num)
//want = fmt.Sprintf("[[INT32(%d)]]", 100+num+num-1)
//waitForQueryResult(t, vtgateConn, "product", "select max(cid) from customer2", want)
res := execVtgateQuery(t, vtgateConn, "product", "select max(cid) from customer2")
cid, err := res.Rows[0][0].ToInt()
require.NoError(t, err)
Expand Down
21 changes: 12 additions & 9 deletions go/vt/vttablet/tabletserver/schema/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,12 +669,13 @@ func (se *Engine) RegisterVersionEvent() error {
return se.historian.RegisterVersionEvent()
}

// GetTableForPos returns a best-effort schema for a specific gtid. If it cannot get
// the table schema for the gtid, it returns the latest table schema available in the
// database (updating the cache entry). If the table is not found in the cache, it will
// reload the cache from the database in case the table was created after the last schema
// reload or the cache has not yet been initialized. This function makes the schema
// cache a read-through cache for VReplication purposes.
// GetTableForPos makes a best-effort attempt to return a table's schema at a specific
// GTID/position. If it cannot get the table schema for the given GTID/position then it
// returns the latest table schema that is available in the database -- the table schema
// for the "current" GTID/position (updating the cache entry). If the table is not found
// in the cache, it will reload the cache from the database in case the table was created
// after the last schema reload or the cache has not yet been initialized. This function
// makes the schema cache a read-through cache for VReplication purposes.
func (se *Engine) GetTableForPos(ctx context.Context, tableName sqlparser.IdentifierCS, gtid string) (*binlogdatapb.MinimalTable, error) {
mt, err := se.historian.GetTableForPos(tableName, gtid)
if err != nil {
Expand All @@ -684,7 +685,7 @@ func (se *Engine) GetTableForPos(ctx context.Context, tableName sqlparser.Identi
if mt != nil {
return mt, nil
}
// We got nothing from the historian, which generally means that it's not enabled.
// We got nothing from the historian, which typically means that it's not enabled.
se.mu.Lock()
defer se.mu.Unlock()
tableNameStr := tableName.String()
Expand Down Expand Up @@ -721,8 +722,8 @@ func (se *Engine) GetTableForPos(ctx context.Context, tableName sqlparser.Identi
log.Infof("internal table %v found in vttablet schema: skipping for GTID search", tableNameStr)
return nil, nil
}
// We don't currently have the table in the cache. This can happen when a table
// was created after the last schema reload (which happens at least every
// We don't currently have the non-internal table in the cache. This can happen when
// a table was created after the last schema reload (which happens at least every
// --queryserver-config-schema-reload-time).
// Whatever the reason, we should ensure that our cache is able to get the latest
// table schema for the "current" position IF the table exists in the database.
Expand All @@ -731,6 +732,8 @@ func (se *Engine) GetTableForPos(ctx context.Context, tableName sqlparser.Identi
// cache for VReplication related needs (this function is only used by vstreamers).
// This adds an additional cost, but for VReplication it should be rare that we are
// trying to replicate a table that doesn't actually exist.
// This also allows us to perform a just-in-time initialization of the cache if
// a vstreamer is the first one to access it.
if se.conns != nil { // Test Engines (NewEngineForTests()) don't have a conns pool
if err := se.reload(ctx, true); err != nil {
return nil, err
Expand Down
32 changes: 23 additions & 9 deletions go/vt/vttablet/tabletserver/schema/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,9 +1316,19 @@ func TestGetTableForPos(t *testing.T) {
cfg.DB = newDBConfigs(fakedb)
table := sqlparser.NewIdentifierCS("t1")
column := "col1"
tableSchema := "create table t1 (col1 varchar(50), primary key(col1))"
tableSchema := fmt.Sprintf("create table %s (%s varchar(50), primary key(col1))", table.String(), column)
tableMt := &binlogdatapb.MinimalTable{
Name: table.String(),
Fields: []*querypb.Field{
{
Name: column,
Type: sqltypes.VarChar,
},
},
PKColumns: []int64{0}, // First column: col1
}

// Don't do any auto cache refreshes.
// Don't do any automatic / TTL based cache refreshes.
se := newEngine(1*time.Hour, 1*time.Hour, 0, fakedb)
se.conns.Open(se.cp, se.cp, se.cp)
se.isOpen = true
Expand All @@ -1333,7 +1343,8 @@ func TestGetTableForPos(t *testing.T) {
fmt.Sprintf("%d", time.Now().Unix()),
))
db.AddQuery(fmt.Sprintf(detectViewChange, sidecar.GetIdentifier()), sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name", "varchar")))
db.AddQuery(fmt.Sprintf(readTableCreateTimes, sidecar.GetIdentifier()), sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name|create_time", "varchar|int64")))
db.AddQuery(fmt.Sprintf(readTableCreateTimes, sidecar.GetIdentifier()),
sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name|create_time", "varchar|int64")))
db.AddQuery(fmt.Sprintf(detectUdfChange, sidecar.GetIdentifier()), &sqltypes.Result{})
db.AddQueryPattern(baseShowTablesPattern,
&sqltypes.Result{
Expand Down Expand Up @@ -1362,11 +1373,13 @@ func TestGetTableForPos(t *testing.T) {
})
db.AddQueryPattern(fmt.Sprintf(mysql.GetColumnNamesQueryPatternForTable, table.String()),
sqltypes.MakeTestResult(sqltypes.MakeTestFields("column_name", "varchar"), column))
db.AddQuery(fmt.Sprintf("SELECT `%s` FROM `fakesqldb`.`%v` WHERE 1 != 1", column, table.String()), sqltypes.MakeTestResult(sqltypes.MakeTestFields(column, "varchar")))
db.AddQuery(fmt.Sprintf("SELECT `%s` FROM `fakesqldb`.`%v` WHERE 1 != 1", column, table.String()),
sqltypes.MakeTestResult(sqltypes.MakeTestFields(column, "varchar")))
db.AddQuery(fmt.Sprintf(`show create table %s`, table.String()),
sqltypes.MakeTestResult(sqltypes.MakeTestFields("Table|Create Table", "varchar|varchar"), table.String(), tableSchema))
db.AddQuery("begin", &sqltypes.Result{})
db.AddQuery(fmt.Sprintf("delete from %s.`tables` where TABLE_SCHEMA = database() and TABLE_NAME in ('%s')", sidecar.GetIdentifier(), table.String()), &sqltypes.Result{})
db.AddQuery(fmt.Sprintf("delete from %s.`tables` where TABLE_SCHEMA = database() and TABLE_NAME in ('%s')",
sidecar.GetIdentifier(), table.String()), &sqltypes.Result{})
db.AddQuery(fmt.Sprintf("insert into %s.`tables`(TABLE_SCHEMA, TABLE_NAME, CREATE_STATEMENT, CREATE_TIME) values (database(), '%s', '%s', %d)",
sidecar.GetIdentifier(), table.String(), tableSchema, time.Now().Unix()), &sqltypes.Result{RowsAffected: 1})
db.AddQuery("rollback", &sqltypes.Result{})
Expand All @@ -1389,7 +1402,7 @@ func TestGetTableForPos(t *testing.T) {
expectFunc: func() {
tbl, err := se.GetTableForPos(ctx, table, "")
require.NoError(t, err)
require.NotNil(t, tbl)
require.Equal(t, tableMt, tbl)
},
},
{
Expand All @@ -1415,7 +1428,7 @@ func TestGetTableForPos(t *testing.T) {
expectFunc: func() {
tbl, err := se.GetTableForPos(ctx, table, "")
require.NoError(t, err)
require.NotNil(t, tbl)
require.Equal(t, tableMt, tbl)
},
},
{
Expand All @@ -1424,7 +1437,7 @@ func TestGetTableForPos(t *testing.T) {
expectedQueriesFunc: func(db *fakesqldb.DB) {
// We only reload the column and PK info for the table in our cache. A new column
// called col2 has been added to the table schema and it is the new PK.
newTableSchema := "create table t1 (id int, col2 varchar, primary key(col2))"
newTableSchema := fmt.Sprintf("create table %s (%s varchar(50), col2 varchar(50), primary key(col2))", table.String(), column)
db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{
Fields: mysql.ShowPrimaryFields,
Rows: [][]sqltypes.Value{
Expand All @@ -1438,7 +1451,8 @@ func TestGetTableForPos(t *testing.T) {
db.AddQuery(fmt.Sprintf(`show create table %s`, table.String()),
sqltypes.MakeTestResult(sqltypes.MakeTestFields("Table|Create Table", "varchar|varchar"), table.String(), newTableSchema))
db.AddQuery("begin", &sqltypes.Result{})
db.AddQuery(fmt.Sprintf("delete from %s.`tables` where TABLE_SCHEMA = database() and TABLE_NAME in ('%s')", sidecar.GetIdentifier(), table.String()), &sqltypes.Result{})
db.AddQuery(fmt.Sprintf("delete from %s.`tables` where TABLE_SCHEMA = database() and TABLE_NAME in ('%s')",
sidecar.GetIdentifier(), table.String()), &sqltypes.Result{})
db.AddQuery(fmt.Sprintf("insert into %s.`tables`(TABLE_SCHEMA, TABLE_NAME, CREATE_STATEMENT, CREATE_TIME) values (database(), '%s', '%s', %d)",
sidecar.GetIdentifier(), table.String(), newTableSchema, time.Now().Unix()), &sqltypes.Result{})
db.AddQuery("rollback", &sqltypes.Result{})
Expand Down

0 comments on commit 8282f55

Please sign in to comment.