Skip to content

Commit

Permalink
*: support SyncLog kv request option (#4689)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Sep 30, 2017
1 parent 3f4bdf6 commit e6360b1
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 1 deletion.
34 changes: 33 additions & 1 deletion executor/executor_test.go
Expand Up @@ -1957,12 +1957,14 @@ const (
checkRequestOff = 0
checkRequestPriority = 1
checkRequestNotFillCache = 2
checkRequestSyncLog = 3
)

type checkRequestClient struct {
tikv.Client
priority pb.CommandPri
notFillCache bool
syncLog bool
mu struct {
sync.RWMutex
checkFlags uint32
Expand All @@ -1985,6 +1987,13 @@ func (c *checkRequestClient) SendReq(ctx goctx.Context, addr string, req *tikvrp
if c.notFillCache != req.NotFillCache {
return nil, errors.New("fail to set not fail cache")
}
} else if checkFlags == checkRequestSyncLog {
switch req.Type {
case tikvrpc.CmdPrewrite, tikvrpc.CmdCommit:
if c.syncLog != req.SyncLog {
return nil, errors.New("fail to set sync log")
}
}
}
return resp, err
}
Expand Down Expand Up @@ -2021,6 +2030,7 @@ func (s *testContextOptionSuite) TestCoprocessorPriority(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t (id int primary key)")
defer tk.MustExec("drop table t")
tk.MustExec("insert into t values (1)")

cli := s.cli
Expand Down Expand Up @@ -2057,13 +2067,13 @@ func (s *testContextOptionSuite) TestCoprocessorPriority(c *C) {
cli.mu.Lock()
cli.mu.checkFlags = checkRequestOff
cli.mu.Unlock()
tk.MustExec("drop table t")
}

func (s *testContextOptionSuite) TestNotFillCache(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t (id int primary key)")
defer tk.MustExec("drop table t")
tk.MustExec("insert into t values (1)")

cli := s.cli
Expand All @@ -2076,6 +2086,28 @@ func (s *testContextOptionSuite) TestNotFillCache(c *C) {
cli.notFillCache = false
tk.MustQuery("select SQL_CACHE * from t")
tk.MustQuery("select * from t")

cli.mu.Lock()
cli.mu.checkFlags = checkRequestOff
cli.mu.Unlock()
}

func (s *testContextOptionSuite) TestSyncLog(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

cli := s.cli
cli.mu.Lock()
cli.mu.checkFlags = checkRequestSyncLog
cli.mu.Unlock()
cli.syncLog = true
tk.MustExec("create table t (id int primary key)")
cli.syncLog = false
tk.MustExec("insert into t values (1)")

cli.mu.Lock()
cli.mu.checkFlags = checkRequestOff
cli.mu.Unlock()
}

func (s *testSuite) TestHandleTransfer(c *C) {
Expand Down
4 changes: 4 additions & 0 deletions kv/kv.go
Expand Up @@ -40,6 +40,8 @@ const (
Priority
// NotFillCache makes this request do not touch the LRU cache of the underlying storage.
NotFillCache
// SyncLog decides whether the WAL(write-ahead log) of this request should be synchronized.
SyncLog
)

// Priority value for transaction priority.
Expand Down Expand Up @@ -181,6 +183,8 @@ type Request struct {
Priority int
// NotFillCache makes this request do not touch the LRU cache of the underlying storage.
NotFillCache bool
// SyncLog decides whether the WAL(write-ahead log) of this request should be synchronized.
SyncLog bool
}

// Response represents the response returned from KV layer.
Expand Down
1 change: 1 addition & 0 deletions meta/meta.go
Expand Up @@ -84,6 +84,7 @@ type Meta struct {
// NewMeta creates a Meta in transaction txn.
func NewMeta(txn kv.Transaction) *Meta {
txn.SetOption(kv.Priority, kv.PriorityHigh)
txn.SetOption(kv.SyncLog, true)
t := structure.NewStructure(txn, txn, mMetaPrefix)
return &Meta{txn: t}
}
Expand Down
18 changes: 18 additions & 0 deletions store/tikv/2pc.go
Expand Up @@ -77,6 +77,7 @@ type twoPhaseCommitter struct {
undetermined bool
}
priority pb.CommandPri
syncLog bool
}

// newTwoPhaseCommitter creates a twoPhaseCommitter.
Expand Down Expand Up @@ -151,6 +152,7 @@ func newTwoPhaseCommitter(txn *tikvTxn) (*twoPhaseCommitter, error) {
mutations: mutations,
lockTTL: txnLockTTL(txn.startTime, size),
priority: getTxnPriority(txn),
syncLog: getTxnSyncLog(txn),
}, nil
}

Expand Down Expand Up @@ -330,6 +332,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
},
Context: pb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
},
}
for {
Expand Down Expand Up @@ -398,6 +401,13 @@ func getTxnPriority(txn *tikvTxn) pb.CommandPri {
return pb.CommandPri_Normal
}

func getTxnSyncLog(txn *tikvTxn) bool {
if sync := txn.us.GetOption(kv.SyncLog); sync != nil {
return sync.(bool)
}
return false
}

func kvPriorityToCommandPri(pri int) pb.CommandPri {
switch pri {
case kv.PriorityLow:
Expand All @@ -416,6 +426,10 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
Keys: batch.keys,
CommitVersion: c.commitTS,
},
Context: pb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
},
}
req.Context.Priority = c.priority

Expand Down Expand Up @@ -481,6 +495,10 @@ func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) e
Keys: batch.keys,
StartVersion: c.startTS,
},
Context: pb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
},
}
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/snapshot.go
Expand Up @@ -43,6 +43,7 @@ type tikvSnapshot struct {
isolationLevel kv.IsoLevel
priority pb.CommandPri
notFillCache bool
syncLog bool
}

var snapshotGP = gp.New(time.Minute)
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/txn.go
Expand Up @@ -129,6 +129,8 @@ func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) {
txn.snapshot.priority = kvPriorityToCommandPri(val.(int))
case kv.NotFillCache:
txn.snapshot.notFillCache = val.(bool)
case kv.SyncLog:
txn.snapshot.syncLog = val.(bool)
}
}

Expand Down

0 comments on commit e6360b1

Please sign in to comment.