Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

binlog: use pumps client to write binlog (#7659) (#8078) #8822

Merged
merged 1 commit into from Dec 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/config.go
Expand Up @@ -177,7 +177,7 @@ type PreparedPlanCache struct {

// OpenTracing is the opentracing section of the config.
type OpenTracing struct {
Enable bool `toml:"enable" json:"enbale"`
Enable bool `toml:"enable" json:"enable"`
Sampler OpenTracingSampler `toml:"sampler" json:"sampler"`
Reporter OpenTracingReporter `toml:"reporter" json:"reporter"`
RPCMetrics bool `toml:"rpc-metrics" json:"rpc-metrics"`
Expand Down Expand Up @@ -231,7 +231,7 @@ type TiKVClient struct {

// Binlog is the config for binlog.
type Binlog struct {
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
Enable bool `toml:"enable" json:"enable"`
WriteTimeout string `toml:"write-timeout" json:"write-timeout"`
// If IgnoreError is true, when writting binlog meets error, TiDB would
// ignore the error.
Expand Down
5 changes: 2 additions & 3 deletions config/config.toml.example
Expand Up @@ -232,9 +232,8 @@ commit-timeout = "41s"
max-txn-time-use = 590

[binlog]

# Socket file to write binlog.
binlog-socket = ""
# enable to write binlog.
enable = false

# WriteTimeout specifies how long it will wait for writing binlog to pump.
write-timeout = "15s"
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Expand Up @@ -33,7 +33,7 @@ func TestT(t *testing.T) {

func (s *testConfigSuite) TestConfig(c *C) {
conf := new(Config)
conf.Binlog.BinlogSocket = "/tmp/socket"
conf.Binlog.Enable = true
conf.Binlog.IgnoreError = true
conf.Performance.RetryLimit = 20
conf.TiKVClient.CommitTimeout = "10s"
Expand All @@ -54,7 +54,7 @@ commit-timeout="41s"`)
c.Assert(conf.Load(configFile), IsNil)

// Test that the original value will not be clear by load the config file that does not contain the option.
c.Assert(conf.Binlog.BinlogSocket, Equals, "/tmp/socket")
c.Assert(conf.Binlog.Enable, Equals, true)

// Test that the value will be overwritten by the config file.
c.Assert(conf.Performance.RetryLimit, Equals, uint(10))
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Expand Up @@ -284,7 +284,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
schemaSyncer: syncer,
workerVars: variable.NewSessionVars(),
}
d.workerVars.BinlogClient = binloginfo.GetPumpClient()
d.workerVars.BinlogClient = binloginfo.GetPumpsClient()

if ctxPool != nil {
supportDelRange := store.SupportDeleteRange()
Expand Down
4 changes: 2 additions & 2 deletions session/session.go
Expand Up @@ -313,7 +313,7 @@ func (s *session) doCommit(ctx context.Context) error {
Tp: binlog.BinlogType_Prewrite,
PrewriteValue: prewriteData,
},
Client: s.sessionVars.BinlogClient.(binlog.PumpClient),
Client: s.sessionVars.BinlogClient,
}
s.txn.SetOption(kv.BinlogInfo, info)
}
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func createSession(store kv.Storage) (*session, error) {
domain.BindDomain(s, dom)
// session implements variable.GlobalVarAccessor. Bind it to ctx.
s.sessionVars.GlobalVarsAccessor = s
s.sessionVars.BinlogClient = binloginfo.GetPumpClient()
s.sessionVars.BinlogClient = binloginfo.GetPumpsClient()
s.txn.init()
return s, nil
}
Expand Down
3 changes: 2 additions & 1 deletion session/session_test.go
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/privilege/privileges"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
Expand Down Expand Up @@ -130,7 +131,7 @@ func (s *testSessionSuite) TestForCoverage(c *C) {
tk.MustExec("admin check table t")

// Cover dirty table operations in StateTxn.
tk.Se.GetSessionVars().BinlogClient = &mockBinlogPump{}
tk.Se.GetSessionVars().BinlogClient = binloginfo.MockPumpsClient(&mockBinlogPump{})
tk.MustExec("begin")
tk.MustExec("truncate table t")
tk.MustExec("insert t values ()")
Expand Down
29 changes: 0 additions & 29 deletions session/tidb.go
Expand Up @@ -38,8 +38,6 @@ import (
"github.com/pingcap/tidb/util/chunk"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

type domainMap struct {
Expand Down Expand Up @@ -306,33 +304,6 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) {
return s, errors.Trace(err)
}

// DialPumpClientWithRetry tries to dial to binlogSocket,
// if any error happens, it will try to re-dial,
// or return this error when timeout.
func DialPumpClientWithRetry(binlogSocket string, maxRetries int, dialerOpt grpc.DialOption) (*grpc.ClientConn, error) {
var clientCon *grpc.ClientConn
err := util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) {
log.Infof("setup binlog client")
var err error
tlsConfig, err := config.GetGlobalConfig().Security.ToTLSConfig()
if err != nil {
log.Infof("error happen when setting binlog client: %s", errors.ErrorStack(err))
}

if tlsConfig != nil {
clientCon, err = grpc.Dial(binlogSocket, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), dialerOpt)
} else {
clientCon, err = grpc.Dial(binlogSocket, grpc.WithInsecure(), dialerOpt)
}

if err != nil {
log.Infof("error happen when setting binlog client: %s", errors.ErrorStack(err))
}
return true, errors.Trace(err)
})
return clientCon, errors.Trace(err)
}

var queryStmtTable = []string{"explain", "select", "show", "execute", "describe", "desc", "admin"}

func trimSQL(sql string) string {
Expand Down
17 changes: 0 additions & 17 deletions session/tidb_test.go
Expand Up @@ -30,12 +30,10 @@ import (
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/auth"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/testleak"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

func TestT(t *testing.T) {
Expand Down Expand Up @@ -124,21 +122,6 @@ func (s *testMainSuite) TestRetryOpenStore(c *C) {
c.Assert(uint64(elapse), GreaterEqual, uint64(3*time.Second))
}

func (s *testMainSuite) TestRetryDialPumpClient(c *C) {
retryDialPumpClientMustFail := func(binlogSocket string, clientCon *grpc.ClientConn, maxRetries int, dialerOpt grpc.DialOption) (err error) {
return util.RunWithRetry(maxRetries, 10, func() (bool, error) {
// Assume that it'll always return an error.
return true, errors.New("must fail")
})
}
begin := time.Now()
err := retryDialPumpClientMustFail("", nil, 3, nil)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "must fail")
elapse := time.Since(begin)
c.Assert(uint64(elapse), GreaterEqual, uint64(6*10*time.Millisecond))
}

func (s *testMainSuite) TestSysSessionPoolGoroutineLeak(c *C) {
c.Skip("make leak should check it")
// TODO: testleak package should be able to find this leak.
Expand Down
111 changes: 68 additions & 43 deletions sessionctx/binloginfo/binloginfo.go
Expand Up @@ -14,45 +14,49 @@
package binloginfo

import (
"io/ioutil"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/juju/errors"
"github.com/pingcap/tidb-tools/tidb-binlog/node"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/terror"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
)

func init() {
grpc.EnableTracing = false
// don't need output pumps client's log
pumpcli.Logger.Out = ioutil.Discard
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not to output?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the first, zhou sir don't want pump client print any log, it may disturb tidb's log. so I add this code.
we can refine pump client's log later, and remove this line if any other reviewer agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hard to debug, create an issue for it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK,I will create a issue

}

var binlogWriteTimeout = 15 * time.Second

// pumpClient is the gRPC client to write binlog, it is opened on server start and never close,
// pumpsClient is the client to write binlog, it is opened on server start and never close,
// shared by all sessions.
var pumpClient binlog.PumpClient
var pumpClientLock sync.RWMutex
var pumpsClient *pumpcli.PumpsClient
var pumpsClientLock sync.RWMutex

// BinlogInfo contains binlog data and binlog client.
type BinlogInfo struct {
Data *binlog.Binlog
Client binlog.PumpClient
Client *pumpcli.PumpsClient
}

// GetPumpClient gets the pump client instance.
func GetPumpClient() binlog.PumpClient {
pumpClientLock.RLock()
client := pumpClient
pumpClientLock.RUnlock()
// GetPumpsClient gets the pumps client instance.
func GetPumpsClient() *pumpcli.PumpsClient {
pumpsClientLock.RLock()
client := pumpsClient
pumpsClientLock.RUnlock()
return client
}

Expand All @@ -65,11 +69,11 @@ func SetGRPCTimeout(timeout time.Duration) {
binlogWriteTimeout = timeout
}

// SetPumpClient sets the pump client instance.
func SetPumpClient(client binlog.PumpClient) {
pumpClientLock.Lock()
pumpClient = client
pumpClientLock.Unlock()
// SetPumpsClient sets the pumps client instance.
func SetPumpsClient(client *pumpcli.PumpsClient) {
pumpsClientLock.Lock()
pumpsClient = client
pumpsClientLock.Unlock()
}

// GetPrewriteValue gets binlog prewrite value in the context.
Expand Down Expand Up @@ -111,58 +115,47 @@ func (info *BinlogInfo) WriteBinlog(clusterID uint64) error {
return nil
}

commitData, err := info.Data.Marshal()
if err != nil {
return errors.Trace(err)
}
req := &binlog.WriteBinlogReq{ClusterID: clusterID, Payload: commitData}

// Retry many times because we may raise CRITICAL error here.
for i := 0; i < 20; i++ {
var resp *binlog.WriteBinlogResp
ctx, cancel := context.WithTimeout(context.Background(), binlogWriteTimeout)
resp, err = info.Client.WriteBinlog(ctx, req)
cancel()
if err == nil && resp.Errmsg != "" {
err = errors.New(resp.Errmsg)
}
if err == nil {
return nil
}
if strings.Contains(err.Error(), "received message larger than max") {
// This kind of error is not critical and not retryable, return directly.
return errors.Errorf("binlog data is too large (%s)", err.Error())
}
log.Errorf("write binlog error %v", err)
time.Sleep(time.Second)
if info.Client == nil {
return errors.New("pumps client is nil")
}

// will retry in PumpsClient if write binlog fail.
err := info.Client.WriteBinlog(info.Data)
if err != nil {
log.Errorf("write binlog fail %v", errors.ErrorStack(err))
if atomic.LoadUint32(&ignoreError) == 1 {
log.Errorf("critical error, write binlog fail but error ignored: %s", errors.ErrorStack(err))
log.Error("write binlog fail but error ignored")
metrics.CriticalErrorCounter.Add(1)
// If error happens once, we'll stop writing binlog.
atomic.CompareAndSwapUint32(&skipBinlog, skip, skip+1)
return nil
}

if strings.Contains(err.Error(), "received message larger than max") {
// This kind of error is not critical, return directly.
return errors.Errorf("binlog data is too large (%s)", err.Error())
}

return terror.ErrCritical.GenByArgs(err)
}

return terror.ErrCritical.GenByArgs(err)
return nil
}

// SetDDLBinlog sets DDL binlog in the kv.Transaction.
func SetDDLBinlog(client interface{}, txn kv.Transaction, jobID int64, ddlQuery string) {
func SetDDLBinlog(client *pumpcli.PumpsClient, txn kv.Transaction, jobID int64, ddlQuery string) {
if client == nil {
return
}

ddlQuery = addSpecialComment(ddlQuery)
info := &BinlogInfo{
Data: &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
DdlJobId: jobID,
DdlQuery: []byte(ddlQuery),
},
Client: client.(binlog.PumpClient),
Client: client,
}
txn.SetOption(kv.BinlogInfo, info)
}
Expand All @@ -182,3 +175,35 @@ func addSpecialComment(ddlQuery string) string {
}
return ddlQuery[:loc[0]] + specialPrefix + ddlQuery[loc[0]:loc[1]] + ` */` + ddlQuery[loc[1]:]
}

// MockPumpsClient creates a PumpsClient, used for test.
func MockPumpsClient(client binlog.PumpClient) *pumpcli.PumpsClient {
nodeID := "pump-1"
pump := &pumpcli.PumpStatus{
Status: node.Status{
NodeID: nodeID,
State: node.Online,
},
IsAvaliable: true,
Client: client,
}

pumpInfos := &pumpcli.PumpInfos{
Pumps: make(map[string]*pumpcli.PumpStatus),
AvaliablePumps: make(map[string]*pumpcli.PumpStatus),
UnAvaliablePumps: make(map[string]*pumpcli.PumpStatus),
}
pumpInfos.Pumps[nodeID] = pump
pumpInfos.AvaliablePumps[nodeID] = pump

pCli := &pumpcli.PumpsClient{
ClusterID: 1,
Pumps: pumpInfos,
Selector: pumpcli.NewSelector(pumpcli.Range),
RetryTime: 1,
BinlogWriteTimeout: binlogWriteTimeout,
}
pCli.Selector.SetPumps([]*pumpcli.PumpStatus{pump})

return pCli
}
5 changes: 3 additions & 2 deletions sessionctx/binloginfo/binloginfo_test.go
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/juju/errors"
. "github.com/pingcap/check"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -80,7 +81,7 @@ type testBinlogSuite struct {
unixFile string
serv *grpc.Server
pump *mockBinlogPump
client binlog.PumpClient
client *pumpcli.PumpsClient
ddl ddl.DDL
}

Expand Down Expand Up @@ -111,7 +112,7 @@ func (s *testBinlogSuite) SetUpSuite(c *C) {
sessionDomain := domain.GetDomain(tk.Se.(sessionctx.Context))
s.ddl = sessionDomain.DDL()

s.client = binlog.NewPumpClient(clientCon)
s.client = binloginfo.MockPumpsClient(binlog.NewPumpClient(clientCon))
s.ddl.WorkerVars().BinlogClient = s.client
}

Expand Down