diff --git a/kv/tikv/response.go b/kv/tikv/response.go index 2128b598e..94de60693 100644 --- a/kv/tikv/response.go +++ b/kv/tikv/response.go @@ -91,7 +91,7 @@ func (rr *RespResult) commitResponse() (*kvrpcpb.CommitResponse, error) { } return nil, errors.New("Unexpected type in response") } -func (rr *RespResult) batchRollbackResponse() (*kvrpcpb.BatchRollbackResponse, error) { +func (rr *RespResult) rollbackResponse() (*kvrpcpb.BatchRollbackResponse, error) { if rr.Err != nil { return nil, rr.Err } diff --git a/kv/tikv/server.go b/kv/tikv/server.go index 2a3b6b35b..7f41d61ce 100644 --- a/kv/tikv/server.go +++ b/kv/tikv/server.go @@ -97,7 +97,9 @@ func (svr *Server) KvCheckTxnStatus(ctx context.Context, req *kvrpcpb.CheckTxnSt } func (svr *Server) KvBatchRollback(ctx context.Context, req *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) { - return nil, nil + cmd := commands.NewRollback(req) + resp := <-svr.scheduler.Run(&cmd) + return resp.rollbackResponse() } func (svr *Server) KvResolveLock(ctx context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) { diff --git a/kv/tikv/storage/commands/rollback.go b/kv/tikv/storage/commands/rollback.go new file mode 100644 index 000000000..beea81dba --- /dev/null +++ b/kv/tikv/storage/commands/rollback.go @@ -0,0 +1,70 @@ +package commands + +import ( + "github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore" + "github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb" +) + +type Rollback struct { + request *kvrpcpb.BatchRollbackRequest +} + +func NewRollback(request *kvrpcpb.BatchRollbackRequest) Rollback { + return Rollback{request} +} + +func (r *Rollback) BuildTxn(txn *kvstore.MvccTxn) error { + txn.StartTS = &r.request.StartVersion + for _, k := range r.request.Keys { + rollbackKey(k, txn) + } + return nil +} + +func rollbackKey(key []byte, txn *kvstore.MvccTxn) error { + lock, err := txn.GetLock(key) + if err != nil { + return err + } + if lock == nil || lock.TS != *txn.StartTS { + return &LockNotFound{key} + } + + if lock.Kind == kvstore.WriteKindPut { + txn.DeleteValue(key) + } + + write := kvstore.Write{StartTS: *txn.StartTS, Kind: kvstore.WriteKindRollback} + txn.PutWrite(key, &write, *txn.StartTS) + txn.DeleteLock(key) + + return nil +} + +func (r *Rollback) Context() *kvrpcpb.Context { + return r.request.Context +} + +func (r *Rollback) Response() interface{} { + return &kvrpcpb.BatchRollbackResponse{} +} + +func (r *Rollback) HandleError(err error) interface{} { + if err == nil { + return nil + } + + if regionErr := extractRegionError(err); regionErr != nil { + resp := kvrpcpb.BatchRollbackResponse{} + resp.RegionError = regionErr + return &resp + } + + if e, ok := err.(KeyError); ok { + resp := kvrpcpb.BatchRollbackResponse{} + resp.Error = e.keyErrors()[0] + return &resp + } + + return nil +} diff --git a/kv/tikv/storage/kvstore/transaction.go b/kv/tikv/storage/kvstore/transaction.go index bb65031b3..0e3fa413c 100644 --- a/kv/tikv/storage/kvstore/transaction.go +++ b/kv/tikv/storage/kvstore/transaction.go @@ -179,6 +179,17 @@ func (txn *MvccTxn) PutValue(key []byte, value []byte) { }) } +// DeleteValue removes a key/value pair in this transaction. +func (txn *MvccTxn) DeleteValue(key []byte) { + txn.Writes = append(txn.Writes, inner_server.Modify{ + Type: inner_server.ModifyTypeDelete, + Data: inner_server.Delete{ + Key: EncodeKey(key, *txn.StartTS), + Cf: engine_util.CfDefault, + }, + }) +} + // EncodeKey encodes a user key and appends an encoded timestamp to a key. Keys and timestamps are encoded so that // timestamped keys are sorted first by key (ascending), then by timestamp (descending). The encoding is based on // https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format. diff --git a/kv/tikv/storage/rollback_test.go b/kv/tikv/storage/rollback_test.go new file mode 100644 index 000000000..974caa39a --- /dev/null +++ b/kv/tikv/storage/rollback_test.go @@ -0,0 +1,146 @@ +package storage + +import ( + "github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore" + "testing" + + "github.com/pingcap-incubator/tinykv/kv/engine_util" + "github.com/pingcap-incubator/tinykv/kv/tikv/inner_server" + "github.com/pingcap-incubator/tinykv/kv/tikv/storage/commands" + "github.com/pingcap-incubator/tinykv/kv/tikv/storage/exec" + "github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb" + "github.com/stretchr/testify/assert" +) + +// TestEmptyRollback tests a rollback with no keys. +func TestEmptyRollback(t *testing.T) { + mem := inner_server.NewMemInnerServer() + sched := exec.NewSeqScheduler(mem) + + builder := newReqBuilder() + cmd := commands.NewRollback(builder.rollbackRequest()) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse) + assert.Nil(t, resp.Error) + assert.Nil(t, resp.RegionError) + assert.Equal(t, 0, mem.Len(engine_util.CfDefault)) + assert.Equal(t, 0, mem.Len(engine_util.CfWrite)) +} + +// TestRollback tests a successful rollback. +func TestRollback(t *testing.T) { + mem := inner_server.NewMemInnerServer() + // See TestSinglePrewrite. + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 100), []byte{42}) + mem.Set(engine_util.CfLock, []byte{3}, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 100}) + sched := exec.NewSeqScheduler(mem) + + builder := newReqBuilder() + cmd := commands.NewRollback(builder.rollbackRequest([]byte{3})) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse) + assert.Nil(t, resp.Error) + assert.Nil(t, resp.RegionError) + assert.Equal(t, 0, mem.Len(engine_util.CfDefault)) + assert.Equal(t, 1, mem.Len(engine_util.CfWrite)) + assert.Equal(t, 0, mem.Len(engine_util.CfLock)) + assert.Equal(t, []byte{3, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 100))) +} + +// TestRollbackDuplicateKeys tests a rollback which rolls back multiple keys, including one duplicated key. +func TestRollbackDuplicateKeys(t *testing.T) { + mem := inner_server.NewMemInnerServer() + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 100), []byte{42}) + mem.Set(engine_util.CfLock, []byte{3}, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 100}) + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{15}, 100), []byte{0}) + mem.Set(engine_util.CfLock, []byte{15}, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 100}) + sched := exec.NewSeqScheduler(mem) + + builder := newReqBuilder() + cmd := commands.NewRollback(builder.rollbackRequest([]byte{3}, []byte{15}, []byte{3})) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse) + assert.Nil(t, resp.Error) + assert.Nil(t, resp.RegionError) + assert.Equal(t, 0, mem.Len(engine_util.CfDefault)) + assert.Equal(t, 2, mem.Len(engine_util.CfWrite)) + assert.Equal(t, 0, mem.Len(engine_util.CfLock)) + assert.Equal(t, []byte{3, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 100))) + assert.Equal(t, []byte{3, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{15}, 100))) +} + +// TestRollbackMissingPrewrite tests trying to roll back a missing prewrite. +func TestRollbackMissingPrewrite(t *testing.T) { + mem := inner_server.NewMemInnerServer() + sched := exec.NewSeqScheduler(mem) + + builder := newReqBuilder() + cmd := commands.NewRollback(builder.rollbackRequest([]byte{3})) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse) + assert.Nil(t, resp.Error) + assert.Nil(t, resp.RegionError) + assert.Equal(t, 0, mem.Len(engine_util.CfDefault)) + assert.Equal(t, 0, mem.Len(engine_util.CfWrite)) +} + +// TestRollbackCommitted tests trying to roll back a transaction which is already committed. +func TestRollbackCommitted(t *testing.T) { + mem := inner_server.NewMemInnerServer() + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 100), []byte{42}) + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 110), []byte{1, 0, 0, 0, 0, 0, 0, 0, 100}) + sched := exec.NewSeqScheduler(mem) + + builder := newReqBuilder() + cmd := commands.NewRollback(builder.rollbackRequest([]byte{3})) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse) + assert.Nil(t, resp.Error) + assert.Nil(t, resp.RegionError) + // Should be no change. + assert.Equal(t, 1, mem.Len(engine_util.CfDefault)) + assert.Equal(t, 0, mem.Len(engine_util.CfLock)) + assert.Equal(t, 1, mem.Len(engine_util.CfWrite)) + assert.Equal(t, []byte{1, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 110))) + assert.Equal(t, []byte{42}, mem.Get(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 100))) +} + +// TestRollbackDuplicate tests trying to roll back a transaction which has already been rolled back. +func TestRollbackDuplicate(t *testing.T) { + mem := inner_server.NewMemInnerServer() + mem.Set(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 110), []byte{3, 0, 0, 0, 0, 0, 0, 0, 100}) + sched := exec.NewSeqScheduler(mem) + + builder := newReqBuilder() + cmd := commands.NewRollback(builder.rollbackRequest([]byte{3})) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse) + assert.Nil(t, resp.Error) + assert.Nil(t, resp.RegionError) + // Should be no change. + assert.Equal(t, 0, mem.Len(engine_util.CfDefault)) + assert.Equal(t, 0, mem.Len(engine_util.CfLock)) + assert.Equal(t, 1, mem.Len(engine_util.CfWrite)) + assert.Equal(t, []byte{3, 0, 0, 0, 0, 0, 0, 0, 100}, mem.Get(engine_util.CfWrite, kvstore.EncodeKey([]byte{3}, 110))) +} + +// TestRollbackOtherTxn tests trying to roll back the wrong transaction. +func TestRollbackOtherTxn(t *testing.T) { + mem := inner_server.NewMemInnerServer() + mem.Set(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 80), []byte{42}) + mem.Set(engine_util.CfLock, []byte{3}, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 80}) + sched := exec.NewSeqScheduler(mem) + + builder := newReqBuilder() + cmd := commands.NewRollback(builder.rollbackRequest([]byte{3})) + resp := run(t, sched, &cmd)[0].(*kvrpcpb.BatchRollbackResponse) + assert.Nil(t, resp.Error) + assert.Nil(t, resp.RegionError) + assert.Equal(t, 1, mem.Len(engine_util.CfDefault)) + assert.Equal(t, 0, mem.Len(engine_util.CfWrite)) + assert.Equal(t, 1, mem.Len(engine_util.CfLock)) + assert.Equal(t, []byte{42}, mem.Get(engine_util.CfDefault, kvstore.EncodeKey([]byte{3}, 80))) + assert.Equal(t, []byte{1, 1, 0, 0, 0, 0, 0, 0, 0, 80}, mem.Get(engine_util.CfLock, []byte{3})) +} + +func (builder *requestBuilder) rollbackRequest(keys ...[]byte) *kvrpcpb.BatchRollbackRequest { + var req kvrpcpb.BatchRollbackRequest + req.StartVersion = builder.nextTS + req.Keys = keys + builder.nextTS++ + return &req +}