Skip to content

Commit

Permalink
Merge pull request #69 from ModEtchFill/failoverTest3c
Browse files Browse the repository at this point in the history
[#20] Failover on Error 1290 read-only
  • Loading branch information
ModEtchFill committed Aug 23, 2019
2 parents f696e5c + 3df1672 commit 42d0a4b
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 76 deletions.
40 changes: 34 additions & 6 deletions tests/unittest/failover/main_test.go
Expand Up @@ -55,7 +55,7 @@ func TestMain(m *testing.M) {
// startup two mysql DBs
ip1 = testutil.MakeMysql("mysql33",dbName)
ip2 = testutil.MakeMysql("mysql44",dbName)
os.Setenv("TWO_TASK", "tcp("+ip1+":3306)/"+dbName+"?timeout=1s||tcp("+ip2+":3306)/"+dbName+"?timeout=1s")
os.Setenv("TWO_TASK", "tcp("+ip1+":3306)/"+dbName+"?timeout=11s||tcp("+ip2+":3306)/"+dbName+"?timeout=11s")

/*
for {
Expand Down Expand Up @@ -140,17 +140,32 @@ func TestFailover(t *testing.T) {
}

func doCrud(conn *sql.Conn, id int, t* testing.T) (bool) {
note := time.Now().Format("test note 2006-01-02j15:04:05.000 failover")

ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)

stmt, err := conn.PrepareContext(ctx, "create table test_failover ( id int, note varchar(55) )")
//note := time.Now().Format("test note 2006-01-02j15:04:05.000 failover")
stmt, err := conn.PrepareContext(ctx, "drop table test_failover")
if err != nil {
return false
}
stmt.Exec()
//noTable := false
_,err = stmt.Exec()
if err != nil {
//noTable = true
}
// ignore errors since table might not exist

stmt, err = conn.PrepareContext(ctx, "create table test_failover ( id int, note varchar(55) )")
if err != nil {
return false
}
_,err = stmt.Exec()
if err != nil {
t.Fatalf("create table had issue %s",err.Error())
}
// ignore errors since table may already exist

/*
// not using txn since mysql
stmt, err = conn.PrepareContext(ctx, "insert into test_failover ( id , note ) values ( ?, ? )")
if err != nil {
Expand All @@ -161,6 +176,16 @@ func doCrud(conn *sql.Conn, id int, t* testing.T) (bool) {
t.Fatalf("Error exec test (insert table) %s\n", err.Error())
}
stmt, err = conn.PrepareContext(ctx, "insert into test_failover (id , note ) values ( ?, ? )")
if err != nil {
t.Fatalf("Error prep test (insert neg-id table) %s\n", err.Error())
}
_, err = stmt.Exec(-id, note)
if err != nil {
t.Fatalf("Error exec test (insert neg-id table) %s\n", err.Error())
}
/*
stmt, err = conn.PrepareContext(ctx, "select note from test_failover where id = ?")
if err != nil {
t.Fatalf("Error preparing test (sel table) %s\n", err.Error())
Expand All @@ -183,16 +208,19 @@ func doCrud(conn *sql.Conn, id int, t* testing.T) (bool) {
rows.Close()
stmt.Close()
// */


stmt, err = conn.PrepareContext(ctx, "delete from test_failover where id = ?")
/*
stmt, err := conn.PrepareContext(ctx, "delete from test_failover where id = ?")
if err != nil {
t.Fatalf("Error preparing test (del table) %s\n", err.Error())
}
_, err = stmt.Exec(id)
if err != nil {
t.Fatalf("Error preparing test (del table) %s\n", err.Error())
t.Fatalf("Error exec test (del table) %s\n", err.Error())
}
// */
return true
}

106 changes: 39 additions & 67 deletions tests/unittest/failover3/main_test.go
Expand Up @@ -56,20 +56,6 @@ func TestMain(m *testing.M) {
ip1 = testutil.MakeMysql("mysql33",dbName)
ip2 = testutil.MakeMysql("mysql44",dbName)
os.Setenv("TWO_TASK", "tcp("+ip1+":3306)/"+dbName+"?timeout=1s||tcp("+ip2+":3306)/"+dbName+"?timeout=1s")

/*
for {
conn, err := net.Dial("tcp", ip2+":3306")
if err != nil {
time.Sleep(1 * time.Second)
logger.GetLogger().Log(logger.Warning, "waiting for mysql server to come up")
continue
} else {
conn.Close()
break
}
} // */

os.Exit(testutil.UtilMain(m, cfg, before))
}

Expand All @@ -92,7 +78,7 @@ func doCrud(conn *sql.Conn, id int, t* testing.T) (bool) {

stmt, err := conn.PrepareContext(ctx, "create table test_failover ( id int, note varchar(55) )")
if err != nil {
commit(conn,t)
//commit(conn,t)
return false
}
stmt.Exec()
Expand All @@ -101,11 +87,17 @@ func doCrud(conn *sql.Conn, id int, t* testing.T) (bool) {
// not using txn since mysql
stmt, err = conn.PrepareContext(ctx, "insert into test_failover ( id , note ) values ( ?, ? )")
if err != nil {
t.Fatalf("Error preparing test (insert table) %s\n", err.Error())
// need to ignore when we're flushing out old connections
//commit(conn,t)
return false
//t.Fatalf("Error preparing test (insert table) %s\n", err.Error())
}
_, err = stmt.Exec(id, note)
if err != nil {
t.Fatalf("Error exec test (insert table) %s\n", err.Error())
// need to ignore when we're flushing out old connections
//commit(conn,t)
return false
//t.Fatalf("Error exec test (insert table) %s\n", err.Error())
}

stmt, err = conn.PrepareContext(ctx, "select note from test_failover where id = ?")
Expand Down Expand Up @@ -144,8 +136,8 @@ func doCrud(conn *sql.Conn, id int, t* testing.T) (bool) {
return true
}

func TestFailover2(t *testing.T) {
logger.GetLogger().Log(logger.Debug, "TestFailover2 begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
func TestFailover3(t *testing.T) {
logger.GetLogger().Log(logger.Debug, "TestFailover3 begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")

shard := 0
db, err := sql.Open("heraloop", fmt.Sprintf("%d:0:0", shard))
Expand All @@ -172,37 +164,43 @@ func TestFailover2(t *testing.T) {
}
doCrud(conn, 1, t)

logger.GetLogger().Log(logger.Debug, "TestFailover2 taking out first db +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
logger.GetLogger().Log(logger.Debug, "TestFailover3 taking out first db +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")

mysqlDirect("set global read_only = 1", t)

logger.GetLogger().Log(logger.Debug, "TestFailover2 taken out first db +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
//conn.Close() // ?

time.Sleep(4 * time.Second)
/* It's easier just to wait for some time instead of trying to flush
old connections */
logger.GetLogger().Log(logger.Debug, "TestFailover2 flush wait done +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
logger.GetLogger().Log(logger.Debug, "TestFailover3 taken out first db +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")

/*
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel2()
conn2, err := db.Conn(ctx2)
if err != nil {
logger.GetLogger().Log(logger.Debug, "reacq conn "+err.Error())
/* some old connections won't have a recent heartbeat, so we'll lose
some queries. */
for i:=0;i<33;i++ {
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel2()
conn2, err := db.Conn(ctx2)
if err != nil {
logger.GetLogger().Log(logger.Debug, "reacq conn "+err.Error())
}
defer conn2.Close()
didWork := doCrud(conn2, 2, t)
didWorkStr := "noWork"
if didWork {
didWorkStr = "workDone"
}
logger.GetLogger().Log(logger.Debug, "spinning checking conns "+didWorkStr+" "+fmt.Sprintf("%d loop", i))
time.Sleep(222 * time.Millisecond)
}
defer conn2.Close()
// */

logger.GetLogger().Log(logger.Debug, "TestFailover3 flush wait done +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")

conn2 := conn
didWork := doCrud(conn2, 2, t)
didWork = didWork || doCrud(conn2, 3, t)
didWork = didWork || doCrud(conn2, 4, t)
didWork = didWork || doCrud(conn2, 5, t)
if !didWork {
logger.GetLogger().Log(logger.Warning, "TestFailover2 post primary shutdown, no work done")
logger.GetLogger().Log(logger.Warning, "TestFailover3 post primary shutdown, no work done")
t.Fatalf("failed to do any work after primary shutdown")
}
logger.GetLogger().Log(logger.Debug, "TestFailover2 done +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")
logger.GetLogger().Log(logger.Debug, "TestFailover3 done +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n")

// cleanup
mysqlDirect("set global read_only = 0", t)
Expand All @@ -211,36 +209,10 @@ func TestFailover2(t *testing.T) {


func mysqlDirect(query string, t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
fullDsn:=fmt.Sprintf("%s:%s@tcp(%s:3306)/%s",
os.Getenv("username"),
os.Getenv("password"),
ip1,
dbName)
//fmt.Println("fullDsn",fullDsn)
db0, err := sql.Open("mysql", fullDsn)
if err != nil {
t.Fatal("Error starting direct mysql:", err)
return
}
db0.SetMaxIdleConns(2)
defer db0.Close()
conn0, err := db0.Conn(ctx)
if err != nil {
t.Fatal("Error conn direct mysql:", err)
return
}
stmt0, err := conn0.PrepareContext(ctx, query)
if err != nil {
t.Fatal("Error prep direct mysql:", err)
return
}
_, err = stmt0.Exec()
if err != nil {
t.Fatal("Error exec direct mysql:", err)
return
}
err := testutil.MysqlDirect(query, ip1, dbName)
if err != nil {
t.Fatalf("mysqlDirect "+query+ip1+dbName+err.Error())
}
}


Expand Down
62 changes: 59 additions & 3 deletions tests/unittest/testutil/setup.go
Expand Up @@ -3,7 +3,9 @@ package testutil
import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"net"
"os"
"os/exec"
Expand Down Expand Up @@ -163,9 +165,6 @@ func MakeMysql(dockerName string, dbName string) (ip string) {
cmd.Run()
ipBuf.Truncate(ipBuf.Len()-1)

os.Setenv("username", "root")
os.Setenv("password", "1-testDb")

for {
conn, err := net.Dial("tcp", ipBuf.String()+":3306")
if err != nil {
Expand All @@ -178,6 +177,23 @@ func MakeMysql(dockerName string, dbName string) (ip string) {
}
}

os.Setenv("username", "root")
os.Setenv("password", "1-testDb")
q := "CREATE USER 'appuser'@'%' IDENTIFIED BY '1-testDb'"
logger.GetLogger().Log(logger.Warning, "set up app user:"+q)
err := MysqlDirect(q, ipBuf.String(), dbName)
if err != nil {
logger.GetLogger().Log(logger.Warning, "set up app user:"+q+" errored "+err.Error())
}
q = "GRANT ALL PRIVILEGES ON "+dbName+" . * TO 'appuser'@'%';"
logger.GetLogger().Log(logger.Warning, "grant app user:"+q)
err = MysqlDirect(q, ipBuf.String(), dbName)
if err != nil {
logger.GetLogger().Log(logger.Warning, "grant app user:"+q+" errored "+err.Error())
} else {
os.Setenv("username", "appuser")
}

return ipBuf.String()
}
func CleanMysql(dockerName string) {
Expand All @@ -187,6 +203,46 @@ func CleanMysql(dockerName string) {
cleanCmd.Run()
}

var dbs map[string]*sql.DB
func MysqlDirect(query string, ip string, dbName string) (error) {
if dbs == nil {
dbs = make(map[string]*sql.DB)
}
db0, ok := dbs[ip+dbName]
if !ok {
fullDsn:=fmt.Sprintf("%s:%s@tcp(%s:3306)/%s",
os.Getenv("username"),
os.Getenv("password"),
ip,
dbName)
//fmt.Println("fullDsn",fullDsn)
var err error
db0, err = sql.Open("mysql", fullDsn)
if err != nil {
return err
}
db0.SetMaxIdleConns(0)
// defer db0.Close()
dbs[ip+dbName] = db0
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
conn0, err := db0.Conn(ctx)
if err != nil {
return err
}
defer conn0.Close()
stmt0, err := conn0.PrepareContext(ctx, query)
if err != nil {
return err
}
defer stmt0.Close()
_, err = stmt0.Exec()
if err != nil {
return err
}
return nil
}

func (m *mux) StartServer() error {
// setup working dir
m.setupWorkdir()
Expand Down
1 change: 1 addition & 0 deletions worker/mysqlworker/adapter.go
Expand Up @@ -195,6 +195,7 @@ func (adapter *mysqlAdapter) ProcessError(errToProcess error, workerScope *share
case 1159: fallthrough // read timeout
case 1160: fallthrough // err write
case 1161: fallthrough // write timeout
case 1290: fallthrough // read-only mode
case 1317: fallthrough // query interupt
case 1836: fallthrough // read-only mode
case 1874: fallthrough // innodb read-only
Expand Down

0 comments on commit 42d0a4b

Please sign in to comment.