Skip to content

Commit

Permalink
Merge pull request #6442 from planetscale/ds-tablet-picker-multicell
Browse files Browse the repository at this point in the history
Multi-cell tablet picker
  • Loading branch information
deepthi committed Jul 17, 2020
2 parents 709dbed + 2d9c8e3 commit f3066ec
Show file tree
Hide file tree
Showing 11 changed files with 555 additions and 216 deletions.
184 changes: 102 additions & 82 deletions go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"os"
"os/exec"
"strings"
_ "strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -91,7 +92,7 @@ func NewVitessCluster(name string) (cluster *VitessCluster, err error) {
}

// InitCluster creates the global processes needed for a cluster
func InitCluster(t *testing.T, cellName string) *VitessCluster {
func InitCluster(t *testing.T, cellNames []string) *VitessCluster {
initGlobals()
vc, _ := NewVitessCluster("Vdemo")
assert.NotNil(t, vc)
Expand All @@ -101,20 +102,25 @@ func InitCluster(t *testing.T, cellName string) *VitessCluster {
assert.Nil(t, topo.Setup("etcd2", nil))
topo.ManageTopoDir("mkdir", "/vitess/global")
vc.Topo = topo
topo.ManageTopoDir("mkdir", "/vitess/"+cellName)
for _, cellName := range cellNames {
topo.ManageTopoDir("mkdir", "/vitess/"+cellName)
}

vtctld := cluster.VtctldProcessInstance(globalConfig.vtctldPort, globalConfig.vtctldGrpcPort,
globalConfig.topoPort, globalConfig.hostname, globalConfig.tmpDir)
vc.Vtctld = vtctld
assert.NotNil(t, vc.Vtctld)
vc.Vtctld.Setup(cellName)
// use first cell as `-cell` and all cells as `-cells_to_watch`
vc.Vtctld.Setup(cellNames[0], "-cells_to_watch", strings.Join(cellNames, ","))

vc.Vtctl = cluster.VtctlProcessInstance(globalConfig.topoPort, globalConfig.hostname)
assert.NotNil(t, vc.Vtctl)
vc.Vtctl.AddCellInfo(cellName)
cell, err := vc.AddCell(t, cellName)
assert.Nil(t, err)
assert.NotNil(t, cell)
for _, cellName := range cellNames {
vc.Vtctl.AddCellInfo(cellName)
cell, err := vc.AddCell(t, cellName)
assert.Nil(t, err)
assert.NotNil(t, cell)
}

vc.VtctlClient = cluster.VtctlClientProcessInstance(globalConfig.hostname, vc.Vtctld.GrpcPort, globalConfig.tmpDir)
assert.NotNil(t, vc.VtctlClient)
Expand All @@ -123,7 +129,7 @@ func InitCluster(t *testing.T, cellName string) *VitessCluster {
}

// AddKeyspace creates a keyspace with specified shard keys and number of replica/read-only tablets
func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, shards string, vschema string, schema string, numReplicas int, numRdonly int, tabletIDBase int) (*Keyspace, error) {
func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string, shards string, vschema string, schema string, numReplicas int, numRdonly int, tabletIDBase int) (*Keyspace, error) {
keyspace := &Keyspace{
Name: ksName,
Shards: make(map[string]*Shard),
Expand All @@ -132,10 +138,16 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, sh
if err := vc.Vtctl.CreateKeyspace(keyspace.Name); err != nil {
t.Fatalf(err.Error())
}
cell.Keyspaces[ksName] = keyspace
if err := vc.AddShards(t, cell, keyspace, shards, numReplicas, numRdonly, tabletIDBase); err != nil {
t.Fatalf(err.Error())
cellsToWatch := ""
for i, cell := range cells {
if i > 0 {
cellsToWatch = cellsToWatch + ","
}
cell.Keyspaces[ksName] = keyspace
cellsToWatch = cellsToWatch + cell.Name
}
require.NoError(t, vc.AddShards(t, cells, keyspace, shards, numReplicas, numRdonly, tabletIDBase))

if schema != "" {
if err := vc.VtctlClient.ApplySchema(ksName, schema); err != nil {
t.Fatalf(err.Error())
Expand All @@ -148,9 +160,11 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cell *Cell, ksName string, sh
}
}
keyspace.VSchema = vschema
if len(cell.Vtgates) == 0 {
fmt.Println("Starting vtgate")
vc.StartVtgate(t, cell)
for _, cell := range cells {
if len(cell.Vtgates) == 0 {
fmt.Println("Starting vtgate")
vc.StartVtgate(t, cell, cellsToWatch)
}
}
_ = vc.VtctlClient.ExecuteCommand("RebuildKeyspaceGraph", ksName)
return keyspace, nil
Expand Down Expand Up @@ -194,87 +208,92 @@ func (vc *VitessCluster) AddTablet(t *testing.T, cell *Cell, keyspace *Keyspace,
}

// AddShards creates shards given list of comma-separated keys with specified tablets in each shard
func (vc *VitessCluster) AddShards(t *testing.T, cell *Cell, keyspace *Keyspace, names string, numReplicas int, numRdonly int, tabletIDBase int) error {
func (vc *VitessCluster) AddShards(t *testing.T, cells []*Cell, keyspace *Keyspace, names string, numReplicas int, numRdonly int, tabletIDBase int) error {
arrNames := strings.Split(names, ",")
fmt.Printf("Addshards got %d shards with %+v\n", len(arrNames), arrNames)
isSharded := len(arrNames) > 1
masterTabletUID := 0
for ind, shardName := range arrNames {
if _, ok := keyspace.Shards[shardName]; ok {
fmt.Printf("Shard %s already exists, not adding\n", shardName)
continue
}
tabletID := tabletIDBase + ind*100
tabletIndex := 0
dbProcesses := make([]*exec.Cmd, 0)
tablets := make([]*Tablet, 0)

fmt.Printf("Adding Shard %s\n", shardName)
if err := vc.VtctlClient.ExecuteCommand("CreateShard", keyspace.Name+"/"+shardName); err != nil {
t.Fatalf("CreateShard command failed with %+v\n", err)
}

shard := &Shard{Name: shardName, IsSharded: isSharded, Tablets: make(map[string]*Tablet, 1)}
fmt.Println("Adding Master tablet")
master, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, master)
tabletIndex++
master.Vttablet.VreplicationTabletType = "MASTER"
tablets = append(tablets, master)
dbProcesses = append(dbProcesses, proc)
for i := 0; i < numReplicas; i++ {
fmt.Println("Adding Replica tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
if _, ok := keyspace.Shards[shardName]; ok {
fmt.Printf("Shard %s already exists, not adding\n", shardName)
} else {
fmt.Printf("Adding Shard %s\n", shardName)
if err := vc.VtctlClient.ExecuteCommand("CreateShard", keyspace.Name+"/"+shardName); err != nil {
t.Fatalf("CreateShard command failed with %+v\n", err)
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
keyspace.Shards[shardName] = shard
}
for i := 0; i < numRdonly; i++ {
fmt.Println("Adding RdOnly tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "rdonly", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
for i, cell := range cells {
dbProcesses := make([]*exec.Cmd, 0)
tablets := make([]*Tablet, 0)
if i == 0 {
// only add master tablet for first cell, so first time CreateShard is called
fmt.Println("Adding Master tablet")
master, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, master)
tabletIndex++
master.Vttablet.VreplicationTabletType = "MASTER"
tablets = append(tablets, master)
dbProcesses = append(dbProcesses, proc)
masterTabletUID = master.Vttablet.TabletUID
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
}

keyspace.Shards[shardName] = shard
for ind, proc := range dbProcesses {
fmt.Printf("Waiting for mysql process for tablet %s\n", tablets[ind].Name)
if err := proc.Wait(); err != nil {
t.Fatalf("%v :: Unable to start mysql server for %v", err, tablets[ind].Vttablet)
for i := 0; i < numReplicas; i++ {
fmt.Println("Adding Replica tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "replica", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
}
}
for ind, tablet := range tablets {
fmt.Printf("Creating vt_keyspace database for tablet %s\n", tablets[ind].Name)
if _, err := tablet.Vttablet.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name),
keyspace.Name, false); err != nil {
t.Fatalf("Unable to start create database vt_%s for tablet %v", keyspace.Name, tablet.Vttablet)
for i := 0; i < numRdonly; i++ {
fmt.Println("Adding RdOnly tablet")
tablet, proc, err := vc.AddTablet(t, cell, keyspace, shard, "rdonly", tabletID+tabletIndex)
if err != nil {
t.Fatalf(err.Error())
}
assert.NotNil(t, tablet)
tabletIndex++
tablets = append(tablets, tablet)
dbProcesses = append(dbProcesses, proc)
}
fmt.Printf("Running Setup() for vttablet %s\n", tablets[ind].Name)
if err := tablet.Vttablet.Setup(); err != nil {
t.Fatalf(err.Error())

for ind, proc := range dbProcesses {
fmt.Printf("Waiting for mysql process for tablet %s\n", tablets[ind].Name)
if err := proc.Wait(); err != nil {
t.Fatalf("%v :: Unable to start mysql server for %v", err, tablets[ind].Vttablet)
}
}
for ind, tablet := range tablets {
fmt.Printf("Creating vt_keyspace database for tablet %s\n", tablets[ind].Name)
if _, err := tablet.Vttablet.QueryTablet(fmt.Sprintf("create database vt_%s", keyspace.Name),
keyspace.Name, false); err != nil {
t.Fatalf("Unable to start create database vt_%s for tablet %v", keyspace.Name, tablet.Vttablet)
}
fmt.Printf("Running Setup() for vttablet %s\n", tablets[ind].Name)
if err := tablet.Vttablet.Setup(); err != nil {
t.Fatalf(err.Error())
}
}
}
fmt.Printf("InitShardMaster for %d\n", master.Vttablet.TabletUID)
err = vc.VtctlClient.InitShardMaster(keyspace.Name, shardName, cell.Name, master.Vttablet.TabletUID)
if err != nil {
t.Fatal(err.Error())
}
require.NotEqual(t, 0, masterTabletUID, "Should have created a master tablet")
fmt.Printf("InitShardMaster for %d\n", masterTabletUID)
require.NoError(t, vc.VtctlClient.InitShardMaster(keyspace.Name, shardName, cells[0].Name, masterTabletUID))
fmt.Printf("Finished creating shard %s\n", shard.Name)
}
return nil
}

// DeleteShard deletes a shard
func (vc *VitessCluster) DeleteShard(t *testing.T, cellName string, ksName string, shardName string) {
shard := vc.Cells[cellName].Keyspaces[ksName].Shards[shardName]
assert.NotNil(t, shard)
Expand All @@ -291,13 +310,13 @@ func (vc *VitessCluster) DeleteShard(t *testing.T, cellName string, ksName strin
}

// StartVtgate starts a vtgate process
func (vc *VitessCluster) StartVtgate(t *testing.T, cell *Cell) {
func (vc *VitessCluster) StartVtgate(t *testing.T, cell *Cell, cellsToWatch string) {
vtgate := cluster.VtgateProcessInstance(
globalConfig.vtgatePort,
globalConfig.vtgateGrpcPort,
globalConfig.vtgateMySQLPort,
cell.Name,
cell.Name,
cellsToWatch,
globalConfig.hostname,
globalConfig.tabletTypes,
globalConfig.topoPort,
Expand Down Expand Up @@ -405,12 +424,13 @@ func (vc *VitessCluster) execTabletQuery(vttablet *cluster.VttabletProcess, quer
Uname: "vt_dba",
}
ctx := context.Background()
if conn, err := mysql.Connect(ctx, &vtParams); err != nil {
var conn *mysql.Conn
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
return nil, err
} else {
qr, err := conn.ExecuteFetch(query, 1000, true)
return qr, err
}
qr, err := conn.ExecuteFetch(query, 1000, true)
return qr, err
}

func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName string, tabletType string) map[string]*cluster.VttabletProcess {
Expand Down

0 comments on commit f3066ec

Please sign in to comment.