Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry for dialPumpClient #4879

Merged
merged 6 commits into from
Oct 24, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/store/localstore/boltdb"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/printer"
Expand Down Expand Up @@ -163,9 +164,10 @@ func setupBinlogClient() {
dialerOpt := grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
})
clientCon, err := grpc.Dial(cfg.BinlogSocket, dialerOpt, grpc.WithInsecure())
var clientConn *grpc.ClientConn
err := tidb.DialPumpClientWithRetry(cfg.BinlogSocket, clientConn, util.DefaultMaxRetries, dialerOpt)
terror.MustNil(err)
binloginfo.SetPumpClient(binlog.NewPumpClient(clientCon))
binloginfo.SetPumpClient(binlog.NewPumpClient(clientConn))
log.Infof("created binlog client at %s", cfg.BinlogSocket)
}

Expand Down
27 changes: 21 additions & 6 deletions tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/types"
"google.golang.org/grpc"
)

// Engine prefix name
const (
EngineGoLevelDBMemory = "memory://"
defaultMaxRetries = 30
retryInterval uint64 = 500
EngineGoLevelDBMemory = "memory://"
)

type domainMap struct {
Expand All @@ -66,7 +65,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
ddlLease = schemaLease
statisticLease = statsLease
}
err = util.RunWithRetry(defaultMaxRetries, retryInterval, func() (retry bool, err1 error) {
err = util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, func() (retry bool, err1 error) {
log.Infof("store %v new domain, ddl lease %v, stats lease %d", store.UUID(), ddlLease, statisticLease)
factory := createSessionFunc(store)
sysFactory := createSessionWithDomainFunc(store)
Expand Down Expand Up @@ -232,7 +231,7 @@ func RegisterLocalStore(name string, driver engine.Driver) error {
//
// The engine should be registered before creating storage.
func NewStore(path string) (kv.Storage, error) {
return newStoreWithRetry(path, defaultMaxRetries)
return newStoreWithRetry(path, util.DefaultMaxRetries)
}

func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) {
Expand All @@ -248,13 +247,29 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) {
}

var s kv.Storage
err1 := util.RunWithRetry(maxRetries, retryInterval, func() (bool, error) {
err1 := util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) {
log.Infof("new store")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we log this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For recording this process.

s, err = d.Open(path)
return kv.IsRetryableError(err), err
})
return s, errors.Trace(err1)
}

// DialPumpClientWithRetry tries to dial to binlogSocket,
// if any error happens, it will try to re-dial,
// or return this error when timeout.
func DialPumpClientWithRetry(binlogSocket string, clientCon *grpc.ClientConn, maxRetries int, dialerOpt grpc.DialOption) error {
return util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) {
log.Infof("setup binlog client")
var err error
clientCon, err = grpc.Dial(binlogSocket, grpc.WithInsecure(), dialerOpt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should log something if meet this error.

if err != nil {
log.Infof("error happen when setting binlog client: %s", errors.ErrorStack(err))
}
return true, errors.Trace(err)
})
}

var queryStmtTable = []string{"explain", "select", "show", "execute", "describe", "desc", "admin"}

func trimSQL(sql string) string {
Expand Down
17 changes: 17 additions & 0 deletions tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/auth"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
"google.golang.org/grpc"
)

var store = flag.String("store", "memory", "registered store name, [memory, goleveldb, boltdb]")
Expand Down Expand Up @@ -122,6 +124,21 @@ func (s *testMainSuite) TestRetryOpenStore(c *C) {
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second))
}

func (s *testMainSuite) TestRetryDialPumpClient(c *C) {
retryDialPumpClientMustFail := func(binlogSocket string, clientCon *grpc.ClientConn, maxRetries int, dialerOpt grpc.DialOption) (err error) {
return util.RunWithRetry(maxRetries, 10, func() (bool, error) {
// Assume that it'll always return an error.
return true, errors.New("must fail")
})
}
begin := time.Now()
err := retryDialPumpClientMustFail("", nil, 3, nil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "must fail")
elapse := time.Since(begin)
c.Assert(uint64(elapse), GreaterEqual, uint64(6*10*time.Millisecond))
}

func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) {
c.Skip("make leak should check it")
// TODO: testleak package should be able to find this leak.
Expand Down
7 changes: 7 additions & 0 deletions util/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ import (
"github.com/juju/errors"
)

const (
// DefaultMaxRetries indicates the max retry count.
DefaultMaxRetries = 30
// RetryInterval indicates retry interval.
RetryInterval uint64 = 500
)

// RunWithRetry will run the f with backoff and retry.
// retryCnt: Max retry count
// backoff: When run f failed, it will sleep backoff * triedCount time.Millisecond.
Expand Down