Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Materialize: Only get schema from source tablets if target is missing tables #6601

Merged
merged 2 commits into from
Aug 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 49 additions & 31 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,51 +589,54 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater
}, nil
}

func (mz *materializer) getSourceTableDDLs(ctx context.Context) (map[string]string, error) {
sourceDDLs := make(map[string]string)
allTables := []string{"/.*/"}

sourceMaster := mz.sourceShards[0].MasterAlias
if sourceMaster == nil {
return nil, fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName())
}

log.Infof("getting table schemas from source master %v...", sourceMaster)
var err error
sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false)
if err != nil {
return nil, err
}
log.Infof("got table schemas from source master %v.", sourceMaster)

for _, td := range sourceSchema.TableDefinitions {
sourceDDLs[td.Name] = td.Schema
}
return sourceDDLs, nil
}

func (mz *materializer) deploySchema(ctx context.Context) error {
var sourceDDLs map[string]string
var mu sync.Mutex

return mz.forAllTargets(func(target *topo.ShardInfo) error {
allTables := []string{"/.*/"}

hasTargetTable := map[string]bool{}
{
log.Infof("getting table schemas from target master %v...", target.MasterAlias)
targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false)
if err != nil {
return err
}
log.Infof("got table schemas from target master %v.", target.MasterAlias)

for _, td := range targetSchema.TableDefinitions {
hasTargetTable[td.Name] = true
}
log.Infof("getting table schemas from target master %v...", target.MasterAlias)
targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false)
if err != nil {
return err
}
log.Infof("got table schemas from target master %v.", target.MasterAlias)

sourceDDL := map[string]string{}
{
sourceMaster := mz.sourceShards[0].MasterAlias
if sourceMaster == nil {
return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName())
}

log.Infof("getting table schemas from source master %v...", sourceMaster)
var err error
sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false)
if err != nil {
return err
}
log.Infof("got table schemas from source master %v.", sourceMaster)

for _, td := range sourceSchema.TableDefinitions {
sourceDDL[td.Name] = td.Schema
}
for _, td := range targetSchema.TableDefinitions {
hasTargetTable[td.Name] = true
}

targetTablet, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias)
if err != nil {
return err
}

applyDDLs := []string{}
var applyDDLs []string
for _, ts := range mz.ms.TableSettings {
if hasTargetTable[ts.TargetTable] {
// Table already exists.
Expand All @@ -642,6 +645,21 @@ func (mz *materializer) deploySchema(ctx context.Context) error {
if ts.CreateDdl == "" {
return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable)
}

var err error
mu.Lock()
if len(sourceDDLs) == 0 {
//only get ddls for tables, once and lazily: if we need to copy the schema from source to target
//we copy schemas from masters on the source keyspace
//and we have found use cases where user just has a replica (no master) in the source keyspace
sourceDDLs, err = mz.getSourceTableDDLs(ctx)
}
mu.Unlock()
if err != nil {
log.Errorf("Error getting DDLs of source tables: %s", err.Error())
return err
}

createDDL := ts.CreateDdl
if createDDL == createDDLAsCopy || createDDL == createDDLAsCopyDropConstraint {
if ts.SourceExpression != "" {
Expand All @@ -656,7 +674,7 @@ func (mz *materializer) deploySchema(ctx context.Context) error {
}
}

ddl, ok := sourceDDL[ts.TargetTable]
ddl, ok := sourceDDLs[ts.TargetTable]
if !ok {
return fmt.Errorf("source table %v does not exist", ts.TargetTable)
}
Expand Down
33 changes: 28 additions & 5 deletions go/vt/wrangler/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package wrangler
import (
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -63,7 +64,6 @@ func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, s
tmc: newTestMaterializerTMClient(),
}
env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc)

tabletID := 100
for _, shard := range sources {
_ = env.addTablet(tabletID, env.ms.SourceKeyspace, shard, topodatapb.TabletType_MASTER)
Expand Down Expand Up @@ -161,18 +161,41 @@ type testMaterializerTMClient struct {
tmclient.TabletManagerClient
schema map[string]*tabletmanagerdatapb.SchemaDefinition

mu sync.Mutex
vrQueries map[int][]*queryResult
mu sync.Mutex
vrQueries map[int][]*queryResult
getSchemaCounts map[string]int
muSchemaCount sync.Mutex
}

func newTestMaterializerTMClient() *testMaterializerTMClient {
return &testMaterializerTMClient{
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
getSchemaCounts: make(map[string]int),
}
}

func (tmc *testMaterializerTMClient) schemaRequested(uid uint32) {
tmc.muSchemaCount.Lock()
defer tmc.muSchemaCount.Unlock()
key := strconv.Itoa(int(uid))
n, ok := tmc.getSchemaCounts[key]
if !ok {
tmc.getSchemaCounts[key] = 1
} else {
tmc.getSchemaCounts[key] = n + 1
}
}

func (tmc *testMaterializerTMClient) getSchemaRequestCount(uid uint32) int {
tmc.muSchemaCount.Lock()
defer tmc.muSchemaCount.Unlock()
key := strconv.Itoa(int(uid))
return tmc.getSchemaCounts[key]
}

func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
tmc.schemaRequested(tablet.Alias.Uid)
schemaDefn := &tabletmanagerdatapb.SchemaDefinition{}
for _, table := range tables {
// TODO: Add generalized regexps if needed for test purposes.
Expand Down
8 changes: 8 additions & 0 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1699,6 +1699,8 @@ func TestMaterializerDeploySchema(t *testing.T) {
err := env.wr.Materialize(context.Background(), ms)
assert.NoError(t, err)
env.tmc.verifyQueries(t)
require.Equal(t, env.tmc.getSchemaRequestCount(100), 1)
require.Equal(t, env.tmc.getSchemaRequestCount(200), 1)
}

func TestMaterializerCopySchema(t *testing.T) {
Expand Down Expand Up @@ -1734,6 +1736,9 @@ func TestMaterializerCopySchema(t *testing.T) {
err := env.wr.Materialize(context.Background(), ms)
assert.NoError(t, err)
env.tmc.verifyQueries(t)
require.Equal(t, env.tmc.getSchemaRequestCount(100), 1)
require.Equal(t, env.tmc.getSchemaRequestCount(200), 1)

}

func TestMaterializerExplicitColumns(t *testing.T) {
Expand Down Expand Up @@ -1922,6 +1927,9 @@ func TestMaterializerNoDDL(t *testing.T) {

err := env.wr.Materialize(context.Background(), ms)
assert.EqualError(t, err, "target table t1 does not exist and there is no create ddl defined")
require.Equal(t, env.tmc.getSchemaRequestCount(100), 0)
require.Equal(t, env.tmc.getSchemaRequestCount(200), 1)

}

func TestMaterializerNoSourceMaster(t *testing.T) {
Expand Down