Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-cell tablet picker #6442

Merged
merged 9 commits into from
Jul 17, 2020
184 changes: 102 additions & 82 deletions go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"os"
"os/exec"
"strings"
_ "strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -91,7 +92,7 @@ func NewVitessCluster(name string) (cluster *VitessCluster, err error) {
}

// InitCluster creates the global processes needed for a cluster
func InitCluster(t *testing.T, cellName string) *VitessCluster {
func InitCluster(t *testing.T, cellNames []string) *VitessCluster {
initGlobals()
vc, _ := NewVitessCluster("Vdemo")
assert.NotNil(t, vc)
Expand All @@ -101,20 +102,25 @@ func InitCluster(t *testing.T, cellName string) *VitessCluster {
assert.Nil(t, topo.Setup("etcd2", nil))
topo.ManageTopoDir("mkdir", "/vitess/global")
vc.Topo = topo
topo.ManageTopoDir("mkdir", "/vitess/"+cellName)
for _, cellName := range cellNames {
topo.ManageTopoDir("mkdir", "/vitess/"+cellName)
}

vtctld := cluster.VtctldProcessInstance(globalConfig.vtctldPort, globalConfig.vtctldGrpcPort,
globalConfig.topoPort, globalConfig.hostname, globalConfig.tmpDir)
vc.Vtctld = vtctld
assert.NotNil(t, vc.Vtctld)
vc.Vtctld.Setup(cellName)
// use first cell as `-cell` and all cells as `-cells_to_watch`
vc.Vtctld.Setup(cellNames[0], "-cells_to_watch", strings.Join(cellNames, ","))

vc.Vtctl = cluster.VtctlProcessInstance(globalConfig.topoPort, globalConfig.hostname)
assert.NotNil(t, vc.Vtctl)
vc.Vtctl.AddCellInfo(cellName)
cell, err := vc.AddCell(t, cellName)
assert.Nil(t, err)
assert.NotNil(t, cell)
for _, cellName := range cellNames {
vc.Vtctl.AddCellInfo(cellName)
cell, err := vc.AddCell(t, cellName)
assert.Nil(t, err)
assert.NotNil(t, cell)
}

vc.VtctlClient = cluster.VtctlClientProcessInstance(globalConfig.hostname, vc.Vtctld.GrpcPort, globalConfig.tmpDir)
assert.NotNil(t, vc.VtctlClient)
Expand All @@ -123,7 +129,7 @@ func InitCluster(t *testing.T, cellName string) *VitessCluster {
}

// AddKeyspace creates a keyspace with specified shard keys and number of replica/read-only tablets
func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, shards string, vschema string, schema string, numReplicas int, numRdonly int, tabletIDBase int) (*Keyspace, error) {
func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string, shards string, vschema string, schema string, numReplicas int, numRdonly int, tabletIDBase int) (*Keyspace, error) {
keyspace := &Keyspace{
Name: ksName,
Shards: make(map[string]*Shard),
Expand All @@ -132,10 +138,16 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, sh
if err := vc.Vtctl.CreateKeyspace(keyspace.Name); err != nil {
t.Fatalf(err.Error())
}
cell.Keyspaces[ksName] = keyspace
if err := vc.AddShards(t, cell, keyspace, shards, numReplicas, numRdonly, tabletIDBase); err != nil {
t.Fatalf(err.Error())
cellsToWatch := ""
for i, cell := range cells {
if i > 0 {
cellsToWatch = cellsToWatch + ","
}
cell.Keyspaces[ksName] = keyspace
cellsToWatch = cellsToWatch + cell.Name
}
require.NoError(t, vc.AddShards(t, cells, keyspace, shards, numReplicas, numRdonly, tabletIDBase))

if schema != "" {
if err := vc.VtctlClient.ApplySchema(ksName, schema); err != nil {
t.Fatalf(err.Error())
Expand All @@ -148,9 +160,11 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, sh
}
}
keyspace.VSchema = vschema
if len(cell.Vtgates) == 0 {
fmt.Println("Starting vtgate")
vc.StartVtgate(t, cell)
for _, cell := range cells {
if len(cell.Vtgates) == 0 {
fmt.Println("Starting vtgate")
vc.StartVtgate(t, cell, cellsToWatch)
}
}
_ = vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", ksName)
return keyspace, nil
Expand Down Expand Up @@ -194,87 +208,92 @@ func (vc *VitessCluster) AddTablet(t *testing.T, cell *Cell, keyspace *Keyspace,
}

// AddShards creates shards given list of comma-separated keys with specified tablets in each shard
func (vc *VitessCluster) AddShards(t *testing.T, cell *Cell, keyspace *Keyspace, names string, numReplicas int, numRdonly int, tabletIDBase int) error {
func (vc *VitessCluster) AddShards(t *testing.T, cells []*Cell, keyspace *Keyspace, names string, numReplicas int, numRdonly int, tabletIDBase int) error {
arrNames := strings.Split(names, ",")
fmt.Printf("Addshards got %d shards with %+v\n", len(arrNames), arrNames)
isSharded := len(arrNames) > 1
masterTabletUID := 0
for ind, shardName := range arrNames {
if _, ok := keyspace.Shards[shardName]; ok {
fmt.Printf("Shard %s already exists, not adding\n", shardName)
continue
}
tabletID := tabletIDBase + ind*100
tabletIndex := 0
dbProcesses := make([]*exec.Cmd, 0)
tablets := make([]*Tablet, 0)

fmt.Printf("Adding Shard %s\n", shardName)
if err := vc.VtctlClient.ExecuteCommand("CreateShard", keyspace.Name+"/"+shardName); err != nil {
t.Fatalf("CreateShard command failed with %+v\n", err)
}

shard := &Shard{Name: shardName, IsSharded: isSharded, Tablets: make(map[string]*Tablet, 1)}
fmt.Println("Adding Master tablet")
master, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, master)
tabletIndex++
master.Vttablet.VreplicationTabletType = "MASTER"
tablets = append(tablets, master)
dbProcesses = append(dbProcesses, proc)
for i := 0; i < numReplicas; i++ {
fmt.Println("Adding Replica tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
if _, ok := keyspace.Shards[shardName]; ok {
fmt.Printf("Shard %s already exists, not adding\n", shardName)
} else {
fmt.Printf("Adding Shard %s\n", shardName)
if err := vc.VtctlClient.ExecuteCommand("CreateShard", keyspace.Name+"/"+shardName); err != nil {
t.Fatalf("CreateShard command failed with %+v\n", err)
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
keyspace.Shards[shardName] = shard
}
for i := 0; i < numRdonly; i++ {
fmt.Println("Adding RdOnly tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "rdonly", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
for i, cell := range cells {
dbProcesses := make([]*exec.Cmd, 0)
tablets := make([]*Tablet, 0)
if i == 0 {
// only add master tablet for first cell, so first time CreateShard is called
fmt.Println("Adding Master tablet")
master, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, master)
tabletIndex++
master.Vttablet.VreplicationTabletType = "MASTER"
tablets = append(tablets, master)
dbProcesses = append(dbProcesses, proc)
masterTabletUID = master.Vttablet.TabletUID
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
}

keyspace.Shards[shardName] = shard
for ind, proc := range dbProcesses {
fmt.Printf("Waiting for mysql process for tablet %s\n", tablets[ind].Name)
if err := proc.Wait(); err != nil {
t.Fatalf("%v :: Unable to start mysql server for %v", err, tablets[ind].Vttablet)
for i := 0; i < numReplicas; i++ {
fmt.Println("Adding Replica tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
}
}
for ind, tablet := range tablets {
fmt.Printf("Creating vt_keyspace database for tablet %s\n", tablets[ind].Name)
if _, err := tablet.Vttablet.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name),
keyspace.Name, false); err != nil {
t.Fatalf("Unable to start create database vt_%s for tablet %v", keyspace.Name, tablet.Vttablet)
for i := 0; i < numRdonly; i++ {
fmt.Println("Adding RdOnly tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "rdonly", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
}
fmt.Printf("Running Setup() for vttablet %s\n", tablets[ind].Name)
if err := tablet.Vttablet.Setup(); err != nil {
t.Fatalf(err.Error())

for ind, proc := range dbProcesses {
fmt.Printf("Waiting for mysql process for tablet %s\n", tablets[ind].Name)
if err := proc.Wait(); err != nil {
t.Fatalf("%v :: Unable to start mysql server for %v", err, tablets[ind].Vttablet)
}
}
for ind, tablet := range tablets {
fmt.Printf("Creating vt_keyspace database for tablet %s\n", tablets[ind].Name)
if _, err := tablet.Vttablet.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name),
keyspace.Name, false); err != nil {
t.Fatalf("Unable to start create database vt_%s for tablet %v", keyspace.Name, tablet.Vttablet)
}
fmt.Printf("Running Setup() for vttablet %s\n", tablets[ind].Name)
if err := tablet.Vttablet.Setup(); err != nil {
t.Fatalf(err.Error())
}
}
}
fmt.Printf("InitShardMaster for %d\n", master.Vttablet.TabletUID)
err = vc.VtctlClient.InitShardMaster(keyspace.Name, shardName, cell.Name, master.Vttablet.TabletUID)
if err != nil {
t.Fatal(err.Error())
}
require.NotEqual(t, 0, masterTabletUID, "Should have created a master tablet")
fmt.Printf("InitShardMaster for %d\n", masterTabletUID)
require.NoError(t, vc.VtctlClient.InitShardMaster(keyspace.Name, shardName, cells[0].Name, masterTabletUID))
fmt.Printf("Finished creating shard %s\n", shard.Name)
}
return nil
}

// DeleteShard deletes a shard
func (vc *VitessCluster) DeleteShard(t *testing.T, cellName string, ksName string, shardName string) {
shard := vc.Cells[cellName].Keyspaces[ksName].Shards[shardName]
assert.NotNil(t, shard)
Expand All @@ -291,13 +310,13 @@ func (vc *VitessCluster) DeleteShard(t *testing.T, cellName string, ksName strin
}

// StartVtgate starts a vtgate process
func (vc *VitessCluster) StartVtgate(t *testing.T, cell *Cell) {
func (vc *VitessCluster) StartVtgate(t *testing.T, cell *Cell, cellsToWatch string) {
vtgate := cluster.VtgateProcessInstance(
globalConfig.vtgatePort,
globalConfig.vtgateGrpcPort,
globalConfig.vtgateMySQLPort,
cell.Name,
cell.Name,
cellsToWatch,
globalConfig.hostname,
globalConfig.tabletTypes,
globalConfig.topoPort,
Expand Down Expand Up @@ -405,12 +424,13 @@ func (vc *VitessCluster) execTabletQuery(vttablet *cluster.VttabletProcess, quer
Uname: "vt_dba",
}
ctx := context.Background()
if conn, err := mysql.Connect(ctx, &vtParams); err != nil {
var conn *mysql.Conn
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
return nil, err
} else {
qr, err := conn.ExecuteFetch(query, 1000, true)
return qr, err
}
qr, err := conn.ExecuteFetch(query, 1000, true)
return qr, err
}

func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName string, tabletType string) map[string]*cluster.VttabletProcess {
Expand Down
Loading