Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Materialize: Only get schema from source tablets if target is missing tables #6601

Merged
merged 2 commits into from
Aug 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 51 additions & 31 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type materializer struct {
targetVSchema *vindexes.KeyspaceSchema
sourceShards []*topo.ShardInfo
targetShards []*topo.ShardInfo
mu sync.Mutex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional, but wanna mention why this mutex exists? it's not really protecting anything inside the materializer right? i think it's protecting the getting of the getTableDDLs? and is that only an optimization? if so, does it make sense to just have it inside the deploy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this mutex is needed, but for a different reason, given below.

}

const (
Expand Down Expand Up @@ -589,51 +590,55 @@ func (wr *Wrangler) buildMaterializer(ctx context.Context, ms *vtctldatapb.Mater
}, nil
}

func (mz *materializer) getTableDDLs(ctx context.Context) (map[string]string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (mz *materializer) getTableDDLs(ctx context.Context) (map[string]string, error) {
func (mz *materializer) getSourceTableDDLs(ctx context.Context) (map[string]string, error) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should run only once per invocation, and all others should use the result: it should populate a new tableDDLs member of mz. Right now, it returns the value to the calling goroutine, which will cause it to execute for each one of those.

Using sync.Once would have been ideal, but you have to handle the error return. So, you may have to live with the global mu. So, it may be better to rename it to schemaMu.

tableDDLs := make(map[string]string)
allTables := []string{"/.*/"}

sourceMaster := mz.sourceShards[0].MasterAlias
if sourceMaster == nil {
return nil, fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName())
}

log.Infof("getting table schemas from source master %v...", sourceMaster)
var err error
sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false)
if err != nil {
return nil, err
}
log.Infof("got table schemas from source master %v.", sourceMaster)

for _, td := range sourceSchema.TableDefinitions {
tableDDLs[td.Name] = td.Schema
}
return tableDDLs, nil
}

func (mz *materializer) deploySchema(ctx context.Context) error {
var tableDDLs map[string]string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var tableDDLs map[string]string
var sourceDDLs map[string]string

i think it's clearer that it's from the source, hence the name


return mz.forAllTargets(func(target *topo.ShardInfo) error {
allTables := []string{"/.*/"}

hasTargetTable := map[string]bool{}
{
log.Infof("getting table schemas from target master %v...", target.MasterAlias)
targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false)
if err != nil {
return err
}
log.Infof("got table schemas from target master %v.", target.MasterAlias)

for _, td := range targetSchema.TableDefinitions {
hasTargetTable[td.Name] = true
}
log.Infof("getting table schemas from target master %v...", target.MasterAlias)
targetSchema, err := mz.wr.GetSchema(ctx, target.MasterAlias, allTables, nil, false)
if err != nil {
return err
}
log.Infof("got table schemas from target master %v.", target.MasterAlias)

sourceDDL := map[string]string{}
{
sourceMaster := mz.sourceShards[0].MasterAlias
if sourceMaster == nil {
return fmt.Errorf("source shard must have a master for copying schema: %v", mz.sourceShards[0].ShardName())
}

log.Infof("getting table schemas from source master %v...", sourceMaster)
var err error
sourceSchema, err := mz.wr.GetSchema(ctx, sourceMaster, allTables, nil, false)
if err != nil {
return err
}
log.Infof("got table schemas from source master %v.", sourceMaster)

for _, td := range sourceSchema.TableDefinitions {
sourceDDL[td.Name] = td.Schema
}
for _, td := range targetSchema.TableDefinitions {
hasTargetTable[td.Name] = true
}

targetTablet, err := mz.wr.ts.GetTablet(ctx, target.MasterAlias)
if err != nil {
return err
}

applyDDLs := []string{}
var applyDDLs []string
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this an empty clause?

}
for _, ts := range mz.ms.TableSettings {
if hasTargetTable[ts.TargetTable] {
// Table already exists.
Expand All @@ -642,6 +647,21 @@ func (mz *materializer) deploySchema(ctx context.Context) error {
if ts.CreateDdl == "" {
return fmt.Errorf("target table %v does not exist and there is no create ddl defined", ts.TargetTable)
}

var err error
mz.mu.Lock()
if len(tableDDLs) == 0 {
//only get ddls for tables, once and lazily: if we need to copy the schema from source to target
//we copy schemas from masters on the source keyspace
//and we have found use cases where user just has a replica (no master) in the source keyspace
tableDDLs, err = mz.getTableDDLs(ctx)
}
mz.mu.Unlock()
if err != nil {
log.Errorf("Error getting DDLs of source tables: %s", err.Error())
return err
}

createDDL := ts.CreateDdl
if createDDL == createDDLAsCopy || createDDL == createDDLAsCopyDropConstraint {
if ts.SourceExpression != "" {
Expand All @@ -656,7 +676,7 @@ func (mz *materializer) deploySchema(ctx context.Context) error {
}
}

ddl, ok := sourceDDL[ts.TargetTable]
ddl, ok := tableDDLs[ts.TargetTable]
if !ok {
return fmt.Errorf("source table %v does not exist", ts.TargetTable)
}
Expand Down
33 changes: 28 additions & 5 deletions go/vt/wrangler/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package wrangler
import (
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -63,7 +64,6 @@ func newTestMaterializerEnv(t *testing.T, ms *vtctldatapb.MaterializeSettings, s
tmc: newTestMaterializerTMClient(),
}
env.wr = New(logutil.NewConsoleLogger(), env.topoServ, env.tmc)

tabletID := 100
for _, shard := range sources {
_ = env.addTablet(tabletID, env.ms.SourceKeyspace, shard, topodatapb.TabletType_MASTER)
Expand Down Expand Up @@ -161,18 +161,41 @@ type testMaterializerTMClient struct {
tmclient.TabletManagerClient
schema map[string]*tabletmanagerdatapb.SchemaDefinition

mu sync.Mutex
vrQueries map[int][]*queryResult
mu sync.Mutex
vrQueries map[int][]*queryResult
getSchemaCounts map[string]int
muSchemaCount sync.Mutex
}

func newTestMaterializerTMClient() *testMaterializerTMClient {
return &testMaterializerTMClient{
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
getSchemaCounts: make(map[string]int),
}
}

func (tmc *testMaterializerTMClient) schemaRequested(uid uint32) {
tmc.muSchemaCount.Lock()
defer tmc.muSchemaCount.Unlock()
key := strconv.Itoa(int(uid))
n, ok := tmc.getSchemaCounts[key]
if !ok {
tmc.getSchemaCounts[key] = 1
} else {
tmc.getSchemaCounts[key] = n + 1
}
}

func (tmc *testMaterializerTMClient) getSchemaRequestCount(uid uint32) int {
tmc.muSchemaCount.Lock()
defer tmc.muSchemaCount.Unlock()
key := strconv.Itoa(int(uid))
return tmc.getSchemaCounts[key]
}

func (tmc *testMaterializerTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
tmc.schemaRequested(tablet.Alias.Uid)
schemaDefn := &tabletmanagerdatapb.SchemaDefinition{}
for _, table := range tables {
// TODO: Add generalized regexps if needed for test purposes.
Expand Down
8 changes: 8 additions & 0 deletions go/vt/wrangler/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1699,6 +1699,8 @@ func TestMaterializerDeploySchema(t *testing.T) {
err := env.wr.Materialize(context.Background(), ms)
assert.NoError(t, err)
env.tmc.verifyQueries(t)
require.Equal(t, env.tmc.getSchemaRequestCount(100), 1)
require.Equal(t, env.tmc.getSchemaRequestCount(200), 1)
}

func TestMaterializerCopySchema(t *testing.T) {
Expand Down Expand Up @@ -1734,6 +1736,9 @@ func TestMaterializerCopySchema(t *testing.T) {
err := env.wr.Materialize(context.Background(), ms)
assert.NoError(t, err)
env.tmc.verifyQueries(t)
require.Equal(t, env.tmc.getSchemaRequestCount(100), 1)
require.Equal(t, env.tmc.getSchemaRequestCount(200), 1)

}

func TestMaterializerExplicitColumns(t *testing.T) {
Expand Down Expand Up @@ -1922,6 +1927,9 @@ func TestMaterializerNoDDL(t *testing.T) {

err := env.wr.Materialize(context.Background(), ms)
assert.EqualError(t, err, "target table t1 does not exist and there is no create ddl defined")
require.Equal(t, env.tmc.getSchemaRequestCount(100), 0)
require.Equal(t, env.tmc.getSchemaRequestCount(200), 1)

}

func TestMaterializerNoSourceMaster(t *testing.T) {
Expand Down