Skip to content

Commit

Permalink
Implement rollback
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Cameron <nrc@ncameron.org>
  • Loading branch information
nrc committed Feb 13, 2020
1 parent 9dc002c commit e0d3499
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 2 deletions.
2 changes: 1 addition & 1 deletion kv/tikv/response.go
Expand Up @@ -91,7 +91,7 @@ func (rr *RespResult) commitResponse() (*kvrpcpb.CommitResponse, error) {
}
return nil, errors.New("Unexpected type in response")
}
func (rr *RespResult) batchRollbackResponse() (*kvrpcpb.BatchRollbackResponse, error) {
func (rr *RespResult) rollbackResponse() (*kvrpcpb.BatchRollbackResponse, error) {
if rr.Err != nil {
return nil, rr.Err
}
Expand Down
4 changes: 3 additions & 1 deletion kv/tikv/server.go
Expand Up @@ -97,7 +97,9 @@ func (svr *Server) KvCheckTxnStatus(ctx context.Context, req *kvrpcpb.CheckTxnSt
}

func (svr *Server) KvBatchRollback(ctx context.Context, req *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) {
return nil, nil
cmd := commands.NewRollback(req)
resp := <-svr.scheduler.Run(&cmd)
return resp.rollbackResponse()
}

func (svr *Server) KvResolveLock(ctx context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) {
Expand Down
70 changes: 70 additions & 0 deletions kv/tikv/storage/commands/rollback.go
@@ -0,0 +1,70 @@
package commands

import (
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
)

type Rollback struct {
request *kvrpcpb.BatchRollbackRequest
}

func NewRollback(request *kvrpcpb.BatchRollbackRequest) Rollback {
return Rollback{request}
}

func (r *Rollback) BuildTxn(txn *kvstore.MvccTxn) error {
txn.StartTS = &r.request.StartVersion
for _, k := range r.request.Keys {
rollbackKey(k, txn)
}
return nil
}

func rollbackKey(key []byte, txn *kvstore.MvccTxn) error {
lock, err := txn.GetLock(key)
if err != nil {
return err
}
if lock == nil || lock.TS != *txn.StartTS {
return &LockNotFound{key}
}

if lock.Kind == kvstore.WriteKindPut {
txn.DeleteValue(key)
}

write := kvstore.Write{StartTS: *txn.StartTS, Kind: kvstore.WriteKindRollback}
txn.PutWrite(key, &write, *txn.StartTS)
txn.DeleteLock(key)

return nil
}

func (r *Rollback) Context() *kvrpcpb.Context {
return r.request.Context
}

func (r *Rollback) Response() interface{} {
return &kvrpcpb.BatchRollbackResponse{}
}

func (r *Rollback) HandleError(err error) interface{} {
if err == nil {
return nil
}

if regionErr := extractRegionError(err); regionErr != nil {
resp := kvrpcpb.BatchRollbackResponse{}
resp.RegionError = regionErr
return &resp
}

if e, ok := err.(KeyError); ok {
resp := kvrpcpb.BatchRollbackResponse{}
resp.Error = e.keyErrors()[0]
return &resp
}

return nil
}
11 changes: 11 additions & 0 deletions kv/tikv/storage/kvstore/transaction.go
Expand Up @@ -179,6 +179,17 @@ func (txn *MvccTxn) PutValue(key []byte, value []byte) {
})
}

// DeleteValue removes a key/value pair in this transaction.
func (txn *MvccTxn) DeleteValue(key []byte) {
txn.Writes = append(txn.Writes, inner_server.Modify{
Type: inner_server.ModifyTypeDelete,
Data: inner_server.Delete{
Key: EncodeKey(key, *txn.StartTS),
Cf: engine_util.CfDefault,
},
})
}

// EncodeKey encodes a user key and appends an encoded timestamp to a key. Keys and timestamps are encoded so that
// timestamped keys are sorted first by key (ascending), then by timestamp (descending). The encoding is based on
// https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format.
Expand Down
146 changes: 146 additions & 0 deletions kv/tikv/storage/rollback_test.go
@@ -0,0 +1,146 @@
package storage

import (
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore"
"testing"

"github.com/pingcap-incubator/tinykv/kv/engine_util"
"github.com/pingcap-incubator/tinykv/kv/tikv/inner_server"
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/commands"
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/exec"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
"github.com/stretchr/testify/assert"
)

// TestEmptyRollback tests a rollback with no keys.
func TestEmptyRollback(t *testing.T) {
mem := inner_server.NewMemInnerServer()
sched := exec.NewSeqScheduler(mem)

builder := newReqBuilder()
cmd := commands.NewRollback(builder.rollbackRequest())
resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse)
assert.Nil(t, resp.Error)
assert.Nil(t, resp.RegionError)
assert.Equal(t, 0, mem.Len(engine_util.CfDefault))
assert.Equal(t, 0, mem.Len(engine_util.CfWrite))
}

// TestRollback tests a successful rollback.
func TestRollback(t *testing.T) {
mem := inner_server.NewMemInnerServer()
// See TestSinglePrewrite.
mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 100), []byte{42})
mem.Set(engine_util.CfLock, []byte{3}, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 100})
sched := exec.NewSeqScheduler(mem)

builder := newReqBuilder()
cmd := commands.NewRollback(builder.rollbackRequest([]byte{3}))
resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse)
assert.Nil(t, resp.Error)
assert.Nil(t, resp.RegionError)
assert.Equal(t, 0, mem.Len(engine_util.CfDefault))
assert.Equal(t, 1, mem.Len(engine_util.CfWrite))
assert.Equal(t, 0, mem.Len(engine_util.CfLock))
assert.Equal(t, []byte{3, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 100)))
}

// TestRollbackDuplicateKeys tests a rollback which rolls back multiple keys, including one duplicated key.
func TestRollbackDuplicateKeys(t *testing.T) {
mem := inner_server.NewMemInnerServer()
mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 100), []byte{42})
mem.Set(engine_util.CfLock, []byte{3}, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 100})
mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{15}, 100), []byte{0})
mem.Set(engine_util.CfLock, []byte{15}, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 100})
sched := exec.NewSeqScheduler(mem)

builder := newReqBuilder()
cmd := commands.NewRollback(builder.rollbackRequest([]byte{3}, []byte{15}, []byte{3}))
resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse)
assert.Nil(t, resp.Error)
assert.Nil(t, resp.RegionError)
assert.Equal(t, 0, mem.Len(engine_util.CfDefault))
assert.Equal(t, 2, mem.Len(engine_util.CfWrite))
assert.Equal(t, 0, mem.Len(engine_util.CfLock))
assert.Equal(t, []byte{3, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 100)))
assert.Equal(t, []byte{3, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{15}, 100)))
}

// TestRollbackMissingPrewrite tests trying to roll back a missing prewrite.
func TestRollbackMissingPrewrite(t *testing.T) {
mem := inner_server.NewMemInnerServer()
sched := exec.NewSeqScheduler(mem)

builder := newReqBuilder()
cmd := commands.NewRollback(builder.rollbackRequest([]byte{3}))
resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse)
assert.Nil(t, resp.Error)
assert.Nil(t, resp.RegionError)
assert.Equal(t, 0, mem.Len(engine_util.CfDefault))
assert.Equal(t, 0, mem.Len(engine_util.CfWrite))
}

// TestRollbackCommitted tests trying to roll back a transaction which is already committed.
func TestRollbackCommitted(t *testing.T) {
mem := inner_server.NewMemInnerServer()
mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 100), []byte{42})
mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 110), []byte{1, 0, 0, 0, 0, 0, 0, 0, 100})
sched := exec.NewSeqScheduler(mem)

builder := newReqBuilder()
cmd := commands.NewRollback(builder.rollbackRequest([]byte{3}))
resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse)
assert.Nil(t, resp.Error)
assert.Nil(t, resp.RegionError)
// Should be no change.
assert.Equal(t, 1, mem.Len(engine_util.CfDefault))
assert.Equal(t, 0, mem.Len(engine_util.CfLock))
assert.Equal(t, 1, mem.Len(engine_util.CfWrite))
assert.Equal(t, []byte{1, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 110)))
assert.Equal(t, []byte{42}, mem.Get(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 100)))
}

// TestRollbackDuplicate tests trying to roll back a transaction which has already been rolled back.
func TestRollbackDuplicate(t *testing.T) {
mem := inner_server.NewMemInnerServer()
mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 110), []byte{3, 0, 0, 0, 0, 0, 0, 0, 100})
sched := exec.NewSeqScheduler(mem)

builder := newReqBuilder()
cmd := commands.NewRollback(builder.rollbackRequest([]byte{3}))
resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse)
assert.Nil(t, resp.Error)
assert.Nil(t, resp.RegionError)
// Should be no change.
assert.Equal(t, 0, mem.Len(engine_util.CfDefault))
assert.Equal(t, 0, mem.Len(engine_util.CfLock))
assert.Equal(t, 1, mem.Len(engine_util.CfWrite))
assert.Equal(t, []byte{3, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 110)))
}

// TestRollbackOtherTxn tests trying to roll back the wrong transaction.
func TestRollbackOtherTxn(t *testing.T) {
mem := inner_server.NewMemInnerServer()
mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 80), []byte{42})
mem.Set(engine_util.CfLock, []byte{3}, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 80})
sched := exec.NewSeqScheduler(mem)

builder := newReqBuilder()
cmd := commands.NewRollback(builder.rollbackRequest([]byte{3}))
resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse)
assert.Nil(t, resp.Error)
assert.Nil(t, resp.RegionError)
assert.Equal(t, 1, mem.Len(engine_util.CfDefault))
assert.Equal(t, 0, mem.Len(engine_util.CfWrite))
assert.Equal(t, 1, mem.Len(engine_util.CfLock))
assert.Equal(t, []byte{42}, mem.Get(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 80)))
assert.Equal(t, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 80}, mem.Get(engine_util.CfLock, []byte{3}))
}

func (builder *requestBuilder) rollbackRequest(keys ...[]byte) *kvrpcpb.BatchRollbackRequest {
var req kvrpcpb.BatchRollbackRequest
req.StartVersion = builder.nextTS
req.Keys = keys
builder.nextTS++
return &req
}

0 comments on commit e0d3499

Please sign in to comment.