Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement rollback and scan #59

Merged
merged 3 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kv/tikv/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (rr *RespResult) commitResponse() (*kvrpcpb.CommitResponse, error) {
}
return nil, errors.New("Unexpected type in response")
}
func (rr *RespResult) batchRollbackResponse() (*kvrpcpb.BatchRollbackResponse, error) {
func (rr *RespResult) rollbackResponse() (*kvrpcpb.BatchRollbackResponse, error) {
if rr.Err != nil {
return nil, rr.Err
}
Expand Down
8 changes: 6 additions & 2 deletions kv/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (svr *Server) KvGet(ctx context.Context, req *kvrpcpb.GetRequest) (*kvrpcpb
}

func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpcpb.ScanResponse, error) {
return nil, nil
cmd := commands.NewScan(req)
resp := <-svr.scheduler.Run(&cmd)
return resp.scanResponse()
}

func (svr *Server) KvPrewrite(ctx context.Context, req *kvrpcpb.PrewriteRequest) (*kvrpcpb.PrewriteResponse, error) {
Expand All @@ -97,7 +99,9 @@ func (svr *Server) KvCheckTxnStatus(ctx context.Context, req *kvrpcpb.CheckTxnSt
}

func (svr *Server) KvBatchRollback(ctx context.Context, req *kvrpcpb.BatchRollbackRequest) (*kvrpcpb.BatchRollbackResponse, error) {
return nil, nil
cmd := commands.NewRollback(req)
resp := <-svr.scheduler.Run(&cmd)
return resp.rollbackResponse()
}

func (svr *Server) KvResolveLock(ctx context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) {
Expand Down
4 changes: 2 additions & 2 deletions kv/tikv/storage/commands/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func commitKey(key []byte, commitTs uint64, txn *kvstore.MvccTxn) error {

if lock.TS != *txn.StartTS {
// Key is locked by a different transaction.
write, err := txn.FindWrite(key, *txn.StartTS)
write, _, err := txn.FindWrite(key, *txn.StartTS)
if err != nil {
return err
}
Expand Down Expand Up @@ -88,7 +88,7 @@ func (c *Commit) HandleError(err error) interface{} {

if e, ok := err.(KeyError); ok {
resp := kvrpcpb.CommitResponse{}
resp.Error = e.keyErrors()[0]
resp.Error = e.KeyErrors()[0]
return &resp
}

Expand Down
41 changes: 18 additions & 23 deletions kv/tikv/storage/commands/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,8 @@ import (
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
)

// LockedError occurs when a key or keys are locked. The protobuf representation of the locked keys is stored as info.
type LockedError struct {
info []kvrpcpb.LockInfo
}

func (err *LockedError) Error() string {
return fmt.Sprintf("storage: %d keys are locked", len(err.info))
}

type KeyError interface {
keyErrors() []*kvrpcpb.KeyError
}

// keyErrors converts a LockedError to amn array of KeyErrors for sending to the client.
func (err *LockedError) keyErrors() []*kvrpcpb.KeyError {
var result []*kvrpcpb.KeyError
for _, i := range err.info {
var ke kvrpcpb.KeyError
ke.Locked = &i
result = append(result, &ke)
}
return result
KeyErrors() []*kvrpcpb.KeyError
}

// WriteConflict occurs when writes from two transactions conflict.
Expand All @@ -43,7 +23,7 @@ func (err *WriteConflict) Error() string {
return fmt.Sprintf("storage: write conflict at key %d", err.key)
}

func (err *WriteConflict) keyErrors() []*kvrpcpb.KeyError {
func (err *WriteConflict) KeyErrors() []*kvrpcpb.KeyError {
var result kvrpcpb.KeyError
result.Conflict = &kvrpcpb.WriteConflict{
StartTs: err.startTS,
Expand All @@ -70,8 +50,23 @@ func (err *LockNotFound) Error() string {
return fmt.Sprintf("storage: lock not found for %v", err.key)
}

func (err *LockNotFound) keyErrors() []*kvrpcpb.KeyError {
func (err *LockNotFound) KeyErrors() []*kvrpcpb.KeyError {
var result kvrpcpb.KeyError
result.Retryable = fmt.Sprintf("lock not found for key %v", err.key)
return []*kvrpcpb.KeyError{&result}
}

type Committed struct {
key []byte
ts uint64
}

func (err *Committed) Error() string {
return fmt.Sprintf("storage: key has already been committed: %v at %d", err.key, err.ts)
}

func (err *Committed) KeyErrors() []*kvrpcpb.KeyError {
var result kvrpcpb.KeyError
result.Abort = fmt.Sprintf("key has already been committed: %v at %d", err.key, err.ts)
return []*kvrpcpb.KeyError{&result}
}
4 changes: 2 additions & 2 deletions kv/tikv/storage/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (g *Get) BuildTxn(txn *kvstore.MvccTxn) error {
}
if lock.IsLockedFor(key, *txn.StartTS) {
// Key is locked.
return &LockedError{[]kvrpcpb.LockInfo{*lock.Info(key)}}
return &kvstore.LockedError{Info: []kvrpcpb.LockInfo{*lock.Info(key)}}
}

// Search writes for a committed value.
Expand Down Expand Up @@ -61,7 +61,7 @@ func (g *Get) HandleError(err error) interface{} {
}

if e, ok := err.(KeyError); ok {
keyErrs := e.keyErrors()
keyErrs := e.KeyErrors()
if len(keyErrs) > 0 {
g.response.Error = keyErrs[0]
return &g.response
Expand Down
4 changes: 2 additions & 2 deletions kv/tikv/storage/commands/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (p *Prewrite) BuildTxn(txn *kvstore.MvccTxn) error {
}

// If any keys were locked, we'll return this to the client.
return &LockedError{locks}
return &kvstore.LockedError{Info: locks}
}

func (p *Prewrite) Context() *kvrpcpb.Context {
Expand All @@ -61,7 +61,7 @@ func (p *Prewrite) HandleError(err error) interface{} {

if e, ok := err.(KeyError); ok {
resp := kvrpcpb.PrewriteResponse{}
resp.Errors = e.keyErrors()
resp.Errors = e.KeyErrors()
return &resp
}

Expand Down
94 changes: 94 additions & 0 deletions kv/tikv/storage/commands/rollback.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package commands

import (
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
)

type Rollback struct {
request *kvrpcpb.BatchRollbackRequest
}

func NewRollback(request *kvrpcpb.BatchRollbackRequest) Rollback {
return Rollback{request}
}

func (r *Rollback) BuildTxn(txn *kvstore.MvccTxn) error {
txn.StartTS = &r.request.StartVersion
for _, k := range r.request.Keys {
err := rollbackKey(k, txn)
if err != nil {
return err
}
}
return nil
}

func rollbackKey(key []byte, txn *kvstore.MvccTxn) error {
lock, err := txn.GetLock(key)
if err != nil {
return err
}

if lock == nil || lock.TS != *txn.StartTS {
// There is no lock, check the write status.
existingWrite, ts, err := txn.FindWrite(key, *txn.StartTS)
if err != nil {
return err
}
if existingWrite == nil {
// There is no write either, presumably the prewrite was lost. We insert a rollback write anyway.
write := kvstore.Write{StartTS: *txn.StartTS, Kind: kvstore.WriteKindRollback}
txn.PutWrite(key, &write, *txn.StartTS)

return nil
} else {
if existingWrite.Kind == kvstore.WriteKindRollback {
// The key has already been rolled back, so nothing to do.
return nil
}

// The key has already been committed. This should not happen since the client should never send both
// commit and rollback requests.
return &Committed{key, ts}
}
}

if lock.Kind == kvstore.WriteKindPut {
txn.DeleteValue(key)
}

write := kvstore.Write{StartTS: *txn.StartTS, Kind: kvstore.WriteKindRollback}
txn.PutWrite(key, &write, *txn.StartTS)
txn.DeleteLock(key)

return nil
}

func (r *Rollback) Context() *kvrpcpb.Context {
return r.request.Context
}

func (r *Rollback) Response() interface{} {
return &kvrpcpb.BatchRollbackResponse{}
}

func (r *Rollback) HandleError(err error) interface{} {
if err == nil {
return nil
}

if regionErr := extractRegionError(err); regionErr != nil {
resp := kvrpcpb.BatchRollbackResponse{}
resp.RegionError = regionErr
return &resp
}

if e, ok := err.(KeyError); ok {
resp := kvrpcpb.BatchRollbackResponse{}
resp.Error = e.KeyErrors()[0]
return &resp
}

return nil
}
75 changes: 75 additions & 0 deletions kv/tikv/storage/commands/scan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package commands

import (
"github.com/pingcap-incubator/tinykv/kv/tikv/storage/kvstore"
"github.com/pingcap-incubator/tinykv/proto/pkg/kvrpcpb"
)

type Scan struct {
request *kvrpcpb.ScanRequest
response *kvrpcpb.ScanResponse
}

func NewScan(request *kvrpcpb.ScanRequest) Scan {
return Scan{request, &kvrpcpb.ScanResponse{}}
}

func (s *Scan) BuildTxn(txn *kvstore.MvccTxn) error {
txn.StartTS = &s.request.Version

scanner := kvstore.NewScanner(s.request.StartKey, txn)
limit := s.request.Limit
for {
if limit == 0 {
// We've scanned up to the requested limit.
return nil
}
limit -= 1

key, value, err := scanner.Next()
if err != nil {
// Key error (e.g., key is locked) is saved as an error in the scan for the client to handle.
if e, ok := err.(KeyError); ok {
keyErrs := e.KeyErrors()
if len(keyErrs) == 0 {
pair := kvrpcpb.KvPair{}
pair.Error = keyErrs[0]
s.response.Pairs = append(s.response.Pairs, &pair)
continue
}
}
// Any other kind of error, we can't handle so quit the scan.
return err
}
if key == nil {
// Reached the end of the DB
return nil
}

pair := kvrpcpb.KvPair{}
pair.Key = key
pair.Value = value
s.response.Pairs = append(s.response.Pairs, &pair)
}
}

func (s *Scan) Context() *kvrpcpb.Context {
return s.request.Context
}

func (s *Scan) Response() interface{} {
return s.response
}

func (s *Scan) HandleError(err error) interface{} {
if err == nil {
return nil
}

if regionErr := extractRegionError(err); regionErr != nil {
s.response.RegionError = regionErr
return &s.response
}

return nil
}
20 changes: 20 additions & 0 deletions kv/tikv/storage/kvstore/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,23 @@ func (lock *Lock) IsLockedFor(key []byte, txnStartTs uint64) bool {
}
return lock.TS <= txnStartTs
}

// LockedError occurs when a key or keys are locked. The protobuf representation of the locked keys is stored as Info.
type LockedError struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move all errors into one file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error can't be with the others in commands because that would create an import cycle. I could pull it out into its own file in kvstore, but since there is just one error I thought it was better to put it here. I don't mind pulling it out though if you prefer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, seems can't still put it in commands/errors.go? 🙁let's keep it now, and make a reorganization later. As I assumed, the course would be only left the predefined errors, types and latch in final, so the errors should be in one file, not everywhere.

Info []kvrpcpb.LockInfo
}

func (err *LockedError) Error() string {
return fmt.Sprintf("storage: %d keys are locked", len(err.Info))
}

// KeyErrors converts a LockedError to an array of KeyErrors for sending to the client.
func (err *LockedError) KeyErrors() []*kvrpcpb.KeyError {
var result []*kvrpcpb.KeyError
for _, i := range err.Info {
var ke kvrpcpb.KeyError
ke.Locked = &i
result = append(result, &ke)
}
return result
}
Loading