Skip to content

Commit

Permalink
Merge pull request #6408 from arindamnayak/pitr-2
Browse files Browse the repository at this point in the history
Implemented PITR-2
  • Loading branch information
deepthi committed Jul 18, 2020
2 parents f3066ec + 878c871 commit e86a967
Show file tree
Hide file tree
Showing 24 changed files with 888 additions and 19 deletions.
@@ -1,10 +1,11 @@
name: misc test
name: docker_test_1
on: [push, pull_request]
jobs:

build:
name: Misc Test
name: Docker Test 1
runs-on: ubuntu-latest

steps:

- name: Set up Go
Expand All @@ -15,6 +16,6 @@ jobs:
- name: Check out code
uses: actions/checkout@v2

- name: Run Misc test which requires docker
- name: Run tests which requires docker 1
run: |
go run test.go -docker=true -shard 25
go run test.go -docker=true --follow -shard 10
@@ -1,10 +1,11 @@
name: cluster_vtctl_web
name: docker test 2
on: [push, pull_request]
jobs:

build:
name: cluster vtctl web
name: Docker Test 2
runs-on: ubuntu-latest

steps:

- name: Set up Go
Expand All @@ -15,8 +16,6 @@ jobs:
- name: Check out code
uses: actions/checkout@v2

- name: Run vtctl web
- name: Run tests which requires docker - 2
run: |
# Running web test inside docker
go run test.go -docker=true -print-log -shard 10
go run test.go -docker=true --follow -shard 25
1 change: 1 addition & 0 deletions build.env
Expand Up @@ -43,3 +43,4 @@ mkdir -p .git/hooks
ln -sf "$PWD/misc/git/pre-commit" .git/hooks/pre-commit
ln -sf "$PWD/misc/git/commit-msg" .git/hooks/commit-msg
git config core.hooksPath .git/hooks
export EXTRA_BIN=$PWD/test/bin
2 changes: 2 additions & 0 deletions docker/test/run.sh
Expand Up @@ -139,6 +139,7 @@ args="$args -v /tmp/mavencache:/home/vitess/.m2"

# Add in the vitess user
args="$args --user vitess"
args="$args -v $PWD/test/bin:/tmp/bin"

# Mount in host VTDATAROOT if one exists, since it might be a RAM disk or SSD.
if [[ -n "$VTDATAROOT" ]]; then
Expand Down Expand Up @@ -172,6 +173,7 @@ fi
# Reset the environment if this was an old bootstrap. We can detect this from VTTOP presence.
bashcmd=$(append_cmd "$bashcmd" "export VTROOT=/vt/src/vitess.io/vitess")
bashcmd=$(append_cmd "$bashcmd" "export VTDATAROOT=/vt/vtdataroot")
bashcmd=$(append_cmd "$bashcmd" "export EXTRA_BIN=/tmp/bin")

bashcmd=$(append_cmd "$bashcmd" "mkdir -p dist; mkdir -p bin; mkdir -p lib; mkdir -p vthook")
bashcmd=$(append_cmd "$bashcmd" "rm -rf /vt/dist; ln -s /vt/src/vitess.io/vitess/dist /vt/dist")
Expand Down
7 changes: 7 additions & 0 deletions go/mysql/filepos_gtid.go
Expand Up @@ -147,6 +147,13 @@ func (gtid filePosGTID) Union(other GTIDSet) GTIDSet {
return filePosOther
}

// Last returns last filePosition
// For filePos based GTID we have only one position
// here we will just return the current filePos
func (gtid filePosGTID) Last() string {
return gtid.String()
}

func init() {
gtidParsers[FilePosFlavorID] = parseFilePosGTID
gtidSetParsers[FilePosFlavorID] = parseFilePosGTIDSet
Expand Down
3 changes: 2 additions & 1 deletion go/mysql/flavor_mysql.go
Expand Up @@ -32,7 +32,8 @@ type mysqlFlavor struct{}

// masterGTIDSet is part of the Flavor interface.
func (mysqlFlavor) masterGTIDSet(c *Conn) (GTIDSet, error) {
qr, err := c.ExecuteFetch("SELECT @@GLOBAL.gtid_executed", 1, false)
// keep @@global as lowercase, as some servers like the Ripple binlog server only honors a lowercase `global` value
qr, err := c.ExecuteFetch("SELECT @@global.gtid_executed", 1, false)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions go/mysql/gtid_set.go
Expand Up @@ -49,6 +49,9 @@ type GTIDSet interface {

// Union returns a union of the receiver GTIDSet and the supplied GTIDSet.
Union(GTIDSet) GTIDSet

// Union returns a union of the receiver GTIDSet and the supplied GTIDSet.
Last() string
}

// gtidSetParsers maps flavor names to parser functions. It is used by
Expand Down
1 change: 1 addition & 0 deletions go/mysql/gtid_test.go
Expand Up @@ -193,6 +193,7 @@ type fakeGTID struct {
}

func (f fakeGTID) String() string { return f.value }
func (f fakeGTID) Last() string { panic("not implemented") }
func (f fakeGTID) Flavor() string { return f.flavor }
func (fakeGTID) SourceServer() interface{} { return int(1) }
func (fakeGTID) SequenceNumber() interface{} { return int(1) }
Expand Down
15 changes: 15 additions & 0 deletions go/mysql/mariadb_gtid.go
Expand Up @@ -233,6 +233,21 @@ func (gtidSet MariadbGTIDSet) Union(other GTIDSet) GTIDSet {
return newSet
}

//Last returns the last gtid
func (gtidSet MariadbGTIDSet) Last() string {
// Sort domains so the string format is deterministic.
domains := make([]uint32, 0, len(gtidSet))
for domain := range gtidSet {
domains = append(domains, domain)
}
sort.Slice(domains, func(i, j int) bool {
return domains[i] < domains[j]
})

lastGTID := domains[len(gtidSet)-1]
return gtidSet[lastGTID].String()
}

// deepCopy returns a deep copy of the set.
func (gtidSet MariadbGTIDSet) deepCopy() MariadbGTIDSet {
newSet := make(MariadbGTIDSet, len(gtidSet))
Expand Down
16 changes: 16 additions & 0 deletions go/mysql/mariadb_gtid_test.go
Expand Up @@ -19,6 +19,9 @@ package mysql
import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestParseMariaGTID(t *testing.T) {
Expand Down Expand Up @@ -640,3 +643,16 @@ func TestMariaGTIDSetUnionNewDomain(t *testing.T) {
t.Error("Union result was not of type MariadbGTIDSet.")
}
}

func TestMariaGTIDSetLast(t *testing.T) {

testCases := map[string]string{
"12-34-5678,11-22-3333,24-52-4523": "24-52-4523",
"12-34-5678": "12-34-5678",
}
for input, want := range testCases {
got, err := parseMariadbGTIDSet(input)
require.NoError(t, err)
assert.Equal(t, want, got.Last())
}
}
20 changes: 20 additions & 0 deletions go/mysql/mysql56_gtid_set.go
Expand Up @@ -171,6 +171,26 @@ func (set Mysql56GTIDSet) String() string {
return buf.String()
}

// Last returns the last gtid as string
// For gtidset having multiple SIDs or multiple intervals
// it just returns the last SID with last interval
func (set Mysql56GTIDSet) Last() string {
buf := &bytes.Buffer{}

if len(set.SIDs()) > 0 {
sid := set.SIDs()[len(set.SIDs())-1]
buf.WriteString(sid.String())
sequences := set[sid]
if len(sequences) > 0 {
buf.WriteByte(':')
lastInterval := sequences[len(sequences)-1]
buf.WriteString(strconv.FormatInt(lastInterval.end, 10))
}
}

return buf.String()
}

// Flavor implements GTIDSet.
func (Mysql56GTIDSet) Flavor() string { return Mysql56FlavorID }

Expand Down
35 changes: 35 additions & 0 deletions go/mysql/mysql56_gtid_set_test.go
Expand Up @@ -21,6 +21,8 @@ import (
"sort"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSortSIDList(t *testing.T) {
Expand Down Expand Up @@ -527,3 +529,36 @@ func TestMysql56GTIDSetSIDBlock(t *testing.T) {
t.Errorf("NewMysql56GTIDSetFromSIDBlock(%#v) = %#v, want %#v", want, set, input)
}
}

func TestMySQL56GTIDSetLast(t *testing.T) {
sid1 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
sid2 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 255}

table := map[string]Mysql56GTIDSet{
// Simple case
"00010203-0405-0607-0809-0a0b0c0d0e0f:5": {
sid1: []interval{{1, 5}},
},
"00010203-0405-0607-0809-0a0b0c0d0e0f:3": {
sid1: []interval{{end: 3}},
},
// Interval with same start and end
"00010203-0405-0607-0809-0a0b0c0d0e0f:12": {
sid1: []interval{{12, 12}},
},
// Multiple intervals
"00010203-0405-0607-0809-0a0b0c0d0e0f:20": {
sid1: []interval{{1, 5}, {10, 20}},
},
// Multiple SIDs
"00010203-0405-0607-0809-0a0b0c0d0eff:50": {
sid1: []interval{{1, 5}, {10, 20}},
sid2: []interval{{1, 5}, {50, 50}},
},
}

for want, input := range table {
got := strings.ToLower(input.Last())
assert.Equal(t, want, got)
}
}
11 changes: 8 additions & 3 deletions go/test/endtoend/cluster/cluster_util.go
Expand Up @@ -73,11 +73,11 @@ func GetMasterPosition(t *testing.T, vttablet Vttablet, hostname string) (string
return pos, gtID
}

// VerifyRowsInTablet Verify total number of rows in a tablet
func VerifyRowsInTablet(t *testing.T, vttablet *Vttablet, ksName string, expectedRows int) {
// VerifyRowsInTabletForTable Verify total number of rows in a table
func VerifyRowsInTabletForTable(t *testing.T, vttablet *Vttablet, ksName string, expectedRows int, tableName string) {
timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
qr, err := vttablet.VttabletProcess.QueryTablet("select * from vt_insert_test", ksName, true)
qr, err := vttablet.VttabletProcess.QueryTablet("select * from "+tableName, ksName, true)
require.Nil(t, err)
if len(qr.Rows) == expectedRows {
return
Expand All @@ -87,6 +87,11 @@ func VerifyRowsInTablet(t *testing.T, vttablet *Vttablet, ksName string, expecte
assert.Fail(t, "expected rows not found.")
}

// VerifyRowsInTablet Verify total number of rows in a tablet
func VerifyRowsInTablet(t *testing.T, vttablet *Vttablet, ksName string, expectedRows int) {
VerifyRowsInTabletForTable(t, vttablet, ksName, expectedRows, "vt_insert_test")
}

// PanicHandler handles the panic in the testcase.
func PanicHandler(t *testing.T) {
err := recover()
Expand Down
110 changes: 110 additions & 0 deletions go/test/endtoend/recovery/pitr/binlog_server.go
@@ -0,0 +1,110 @@
package pitr

import (
"fmt"
"os"
"os/exec"
"path"
"strings"
"syscall"
"time"

"vitess.io/vitess/go/vt/log"
)

const (
binlogExecutableName = "rippled"
binlogDataDir = "binlog_dir"
binlogUser = "ripple"
)

type binLogServer struct {
hostname string
port int
username string
dataDirectory string
executablePath string

proc *exec.Cmd
exit chan error
}

type mysqlMaster struct {
hostname string
port int
username string
password string
}

// newBinlogServer returns an instance of binlog server
func newBinlogServer(hostname string, port int) (*binLogServer, error) {
dataDir := path.Join(os.Getenv("VTDATAROOT"), binlogDataDir)
fmt.Println(dataDir)
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
err := os.Mkdir(dataDir, 0700)
if err != nil {
log.Error(err)
return nil, err
}
}
return &binLogServer{
executablePath: path.Join(os.Getenv("EXTRA_BIN"), binlogExecutableName),
dataDirectory: dataDir,
username: binlogUser,
hostname: hostname,
port: port,
}, nil
}

// start starts the binlog server points to running mysql port
func (bs *binLogServer) start(master mysqlMaster) error {
bs.proc = exec.Command(
bs.executablePath,
fmt.Sprintf("-ripple_datadir=%s", bs.dataDirectory),
fmt.Sprintf("-ripple_master_address=%s", master.hostname),
fmt.Sprintf("-ripple_master_port=%d", master.port),
fmt.Sprintf("-ripple_master_user=%s", master.username),
fmt.Sprintf("-ripple_server_ports=%d", bs.port),
)
if master.password != "" {
bs.proc.Args = append(bs.proc.Args, fmt.Sprintf("-ripple_master_password=%s", master.password))
}

errFile, _ := os.Create(path.Join(bs.dataDirectory, "log.txt"))
bs.proc.Stderr = errFile

bs.proc.Env = append(bs.proc.Env, os.Environ()...)

log.Infof("Running binlog server with command: %v", strings.Join(bs.proc.Args, " "))

err := bs.proc.Start()
if err != nil {
return err
}
bs.exit = make(chan error)
go func() {
if bs.proc != nil {
bs.exit <- bs.proc.Wait()
}
}()
return nil
}

func (bs *binLogServer) stop() error {
if bs.proc == nil || bs.exit == nil {
return nil
}
// Attempt graceful shutdown with SIGTERM first
bs.proc.Process.Signal(syscall.SIGTERM)

select {
case err := <-bs.exit:
bs.proc = nil
return err

case <-time.After(10 * time.Second):
bs.proc.Process.Kill()
bs.proc = nil
return <-bs.exit
}
}

0 comments on commit e86a967

Please sign in to comment.