Skip to content

Commit

Permalink
kv: only allow bounding a scan through bounding a request batch
Browse files Browse the repository at this point in the history
Removed the support for bounding a particular scan request. A scan
request has a begin and end key and is unbounded. In order to bound
scans, the user needs to apply a bound on the batch which the scan
is a part of.
  • Loading branch information
vivekmenezes committed Jul 25, 2016
1 parent 6c5d603 commit c217fce
Show file tree
Hide file tree
Showing 16 changed files with 331 additions and 541 deletions.
24 changes: 11 additions & 13 deletions internal/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (b *Batch) Inc(key interface{}, value int64) {
b.initResult(1, 1, notRaw, nil)
}

func (b *Batch) scan(s, e interface{}, maxRows int64, isReverse bool) {
func (b *Batch) scan(s, e interface{}, isReverse bool) {
begin, err := marshalKey(s)
if err != nil {
b.initResult(0, 0, notRaw, err)
Expand All @@ -458,35 +458,33 @@ func (b *Batch) scan(s, e interface{}, maxRows int64, isReverse bool) {
return
}
if !isReverse {
b.appendReqs(roachpb.NewScan(roachpb.Key(begin), roachpb.Key(end), maxRows))
b.appendReqs(roachpb.NewScan(roachpb.Key(begin), roachpb.Key(end)))
} else {
b.appendReqs(roachpb.NewReverseScan(roachpb.Key(begin), roachpb.Key(end), maxRows))
b.appendReqs(roachpb.NewReverseScan(roachpb.Key(begin), roachpb.Key(end)))
}
b.initResult(1, 0, notRaw, nil)
}

// Scan retrieves the key/values between begin (inclusive) and end (exclusive) in
// ascending order.
//
// A new result will be appended to the batch which will contain up to maxRows
// "rows" (each row is a key/value pair) and Result.Err will indicate success or
// failure.
// A new result will be appended to the batch which will contain "rows" (each
// row is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) Scan(s, e interface{}, maxRows int64) {
b.scan(s, e, maxRows, false)
func (b *Batch) Scan(s, e interface{}) {
b.scan(s, e, false)
}

// ReverseScan retrieves the rows between begin (inclusive) and end (exclusive)
// in descending order.
//
// A new result will be appended to the batch which will contain up to maxRows
// rows (each "row" is a key/value pair) and Result.Err will indicate success or
// failure.
// A new result will be appended to the batch which will contain "rows" (each
// "row" is a key/value pair) and Result.Err will indicate success or failure.
//
// key can be either a byte slice or a string.
func (b *Batch) ReverseScan(s, e interface{}, maxRows int64) {
b.scan(s, e, maxRows, true)
func (b *Batch) ReverseScan(s, e interface{}) {
b.scan(s, e, true)
}

// CheckConsistency creates a batch request to check the consistency of the
Expand Down
47 changes: 23 additions & 24 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ func TestClientBatch(t *testing.T) {
// Now try 2 scans.
{
b := &client.Batch{}
b.Scan(testUser+"/key 00", testUser+"/key 05", 0)
b.Scan(testUser+"/key 05", testUser+"/key 10", 0)
b.Scan(testUser+"/key 00", testUser+"/key 05")
b.Scan(testUser+"/key 05", testUser+"/key 10")
if err := db.Run(b); err != nil {
t.Error(err)
}
Expand All @@ -513,9 +513,9 @@ func TestClientBatch(t *testing.T) {
// Try a limited batch of 2 scans.
{
b := &client.Batch{}
b.Header.MaxScanResults = 7
b.Scan(testUser+"/key 00", testUser+"/key 05", 0)
b.Scan(testUser+"/key 05", testUser+"/key 10", 0)
b.Header.MaxSpanRequestKeys = 7
b.Scan(testUser+"/key 00", testUser+"/key 05")
b.Scan(testUser+"/key 05", testUser+"/key 10")
if err := db.Run(b); err != nil {
t.Error(err)
}
Expand All @@ -526,9 +526,9 @@ func TestClientBatch(t *testing.T) {
// Try a limited batch of 2 scans.
{
b := &client.Batch{}
b.Header.MaxScanResults = 7
b.Scan(testUser+"/key 05", testUser+"/key 10", 0)
b.Scan(testUser+"/key 00", testUser+"/key 05", 0)
b.Header.MaxSpanRequestKeys = 7
b.Scan(testUser+"/key 05", testUser+"/key 10")
b.Scan(testUser+"/key 00", testUser+"/key 05")
if err := db.Run(b); err != nil {
t.Error(err)
}
Expand All @@ -539,9 +539,9 @@ func TestClientBatch(t *testing.T) {
// Try a limited batch of 2 scans.
{
b := &client.Batch{}
b.Header.MaxScanResults = 3
b.Scan(testUser+"/key 00", testUser+"/key 05", 0)
b.Scan(testUser+"/key 05", testUser+"/key 10", 0)
b.Header.MaxSpanRequestKeys = 3
b.Scan(testUser+"/key 00", testUser+"/key 05")
b.Scan(testUser+"/key 05", testUser+"/key 10")
if err := db.Run(b); err != nil {
t.Error(err)
}
Expand All @@ -552,8 +552,8 @@ func TestClientBatch(t *testing.T) {
// Try 2 reverse scans.
{
b := &client.Batch{}
b.ReverseScan(testUser+"/key 00", testUser+"/key 05", 0)
b.ReverseScan(testUser+"/key 05", testUser+"/key 10", 0)
b.ReverseScan(testUser+"/key 00", testUser+"/key 05")
b.ReverseScan(testUser+"/key 05", testUser+"/key 10")
if err := db.Run(b); err != nil {
t.Error(err)
}
Expand All @@ -564,9 +564,9 @@ func TestClientBatch(t *testing.T) {
// Try a limited batch of 2 reverse scans.
{
b := &client.Batch{}
b.Header.MaxScanResults = 7
b.ReverseScan(testUser+"/key 00", testUser+"/key 05", 0)
b.ReverseScan(testUser+"/key 05", testUser+"/key 10", 0)
b.Header.MaxSpanRequestKeys = 7
b.ReverseScan(testUser+"/key 00", testUser+"/key 05")
b.ReverseScan(testUser+"/key 05", testUser+"/key 10")
if err := db.Run(b); err != nil {
t.Error(err)
}
Expand All @@ -577,9 +577,9 @@ func TestClientBatch(t *testing.T) {
// Try a limited batch of 2 reverse scans.
{
b := &client.Batch{}
b.Header.MaxScanResults = 7
b.ReverseScan(testUser+"/key 05", testUser+"/key 10", 0)
b.ReverseScan(testUser+"/key 00", testUser+"/key 05", 0)
b.Header.MaxSpanRequestKeys = 7
b.ReverseScan(testUser+"/key 05", testUser+"/key 10")
b.ReverseScan(testUser+"/key 00", testUser+"/key 05")
if err := db.Run(b); err != nil {
t.Error(err)
}
Expand All @@ -590,9 +590,9 @@ func TestClientBatch(t *testing.T) {
// Try a limited batch of 2 reverse scans.
{
b := &client.Batch{}
b.Header.MaxScanResults = 3
b.ReverseScan(testUser+"/key 00", testUser+"/key 05", 0)
b.ReverseScan(testUser+"/key 05", testUser+"/key 10", 0)
b.Header.MaxSpanRequestKeys = 3
b.ReverseScan(testUser+"/key 00", testUser+"/key 05")
b.ReverseScan(testUser+"/key 05", testUser+"/key 10")
if err := db.Run(b); err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -828,8 +828,7 @@ func TestInconsistentReads(t *testing.T) {
b := prepInconsistent()
key1 := roachpb.Key([]byte("key1"))
key2 := roachpb.Key([]byte("key2"))
const dontCareMaxRows = 1000
b.Scan(key1, key2, dontCareMaxRows)
b.Scan(key1, key2)
if err := db.Run(b); err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 5 additions & 2 deletions internal/client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,13 @@ func (db *DB) scan(
) ([]KeyValue, error) {
b := db.NewBatch()
b.Header.ReadConsistency = readConsistency
if maxRows > 0 {
b.Header.MaxSpanRequestKeys = maxRows
}
if !isReverse {
b.Scan(begin, end, maxRows)
b.Scan(begin, end)
} else {
b.ReverseScan(begin, end, maxRows)
b.ReverseScan(begin, end)
}
r, err := runOneResult(db, b)
return r.Rows, err
Expand Down
7 changes: 5 additions & 2 deletions internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,13 @@ func (txn *Txn) Inc(key interface{}, value int64) (KeyValue, error) {

func (txn *Txn) scan(begin, end interface{}, maxRows int64, isReverse bool) ([]KeyValue, error) {
b := txn.NewBatch()
if maxRows > 0 {
b.Header.MaxSpanRequestKeys = maxRows
}
if !isReverse {
b.Scan(begin, end, maxRows)
b.Scan(begin, end)
} else {
b.ReverseScan(begin, end, maxRows)
b.ReverseScan(begin, end)
}
r, err := runOneResult(txn, b)
return r.Rows, err
Expand Down
83 changes: 10 additions & 73 deletions kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func (ds *DistSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roach
panic("empty batch")
}

if ba.MaxScanResults != 0 {
if ba.MaxSpanRequestKeys != 0 {
// Verify that the batch contains only Scan or ReverseScan requests.
fwd, rev := false, false
for _, req := range ba.Requests {
Expand All @@ -554,10 +554,10 @@ func (ds *DistSender) Send(ctx context.Context, ba roachpb.BatchRequest) (*roach

var rplChunks []*roachpb.BatchResponse
parts := ba.Split(false /* don't split ET */)
if len(parts) > 1 && ba.MaxScanResults != 0 {
if len(parts) > 1 && ba.MaxSpanRequestKeys != 0 {
// We already verified above that the batch contains only scan requests of the same type.
// Such a batch should never need splitting.
panic("batch with MaxScanResults needs splitting")
panic("batch with MaxSpanRequestKeys needs splitting")
}
for len(parts) > 0 {
part := parts[0]
Expand Down Expand Up @@ -805,19 +805,19 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*
}
}

if ba.MaxScanResults > 0 {
if ba.MaxSpanRequestKeys > 0 {
// Count how many results we received.
var numResults int64
for _, resp := range curReply.Responses {
if cResp, ok := resp.GetInner().(roachpb.Countable); ok {
numResults += cResp.Count()
}
}
if numResults > ba.MaxScanResults {
panic(fmt.Sprintf("received %d results, limit was %d", numResults, ba.MaxScanResults))
if numResults > ba.MaxSpanRequestKeys {
panic(fmt.Sprintf("received %d results, limit was %d", numResults, ba.MaxSpanRequestKeys))
}
ba.MaxScanResults -= numResults
if ba.MaxScanResults == 0 {
ba.MaxSpanRequestKeys -= numResults
if ba.MaxSpanRequestKeys == 0 {
// We are done with this batch. Some requests might have NoopResponses; we must
// replace them with empty responses of the proper type.
for i, req := range ba.Requests {
Expand All @@ -839,65 +839,6 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*
}
}

// If this request has a bound (such as MaxResults in ScanRequest) and
// we are going to query at least one more range, check whether enough
// rows have been retrieved.
if needAnother {
// Start with the assumption that all requests are saturated.
// Below, we look at each and decide whether that's true.
// Everything that is indeed saturated is "masked out" from the
// batch request; only if that's all requests does needAnother
// remain false.
needAnother = false
if br == nil {
// Clone ba.Requests. This is because we're multi-range, and
// some requests may be bounded, which could lead to them being
// masked out once they're saturated. We don't want to risk
// removing requests that way in the "master copy" since that
// could lead to omitting requests in certain retry scenarios.
ba.Requests = append([]roachpb.RequestUnion(nil), ba.Requests...)
}
for i, union := range ba.Requests {
args := union.GetInner()
if _, ok := args.(*roachpb.NoopRequest); ok {
// NoopRequests are skipped.
continue
}
boundedArg, ok := args.(roachpb.Bounded)
if !ok {
// Non-bounded request. We will have to continue querying
// until this request is satisfied. This request might
// have already been satisfied at this stage but we have
// to be pessimistic here.
needAnother = true
continue
}
prevBound := boundedArg.GetBound()
cReply, ok := curReply.Responses[i].GetInner().(roachpb.Countable)
if !ok || prevBound <= 0 {
// Request bounded, but without max results. Again, will
// need to query everything we can. The case in which the reply
// isn't countable occurs when the request wasn't active for
// that range (since it didn't apply to it), so the response
// is a NoopResponse.
needAnother = true
continue
}
nextBound := prevBound - cReply.Count()
if nextBound <= 0 {
// We've hit max results for this piece of the batch. Mask
// it out (we've copied the requests slice above, so this
// is kosher).
union := &ba.Requests[i] // avoid working on copy
union.MustSetInner(&noopRequest)
continue
}
// The request isn't saturated yet.
needAnother = true
boundedArg.SetBound(nextBound)
}
}

// If this was the last range accessed by this call, exit loop.
if !needAnother {
return br, nil, false
Expand All @@ -923,13 +864,9 @@ func (ds *DistSender) sendChunk(ctx context.Context, ba roachpb.BatchRequest) (*
return nil, roachpb.NewError(err), false
}

// It's possible that the key update has created an empty interval,
// indicating that we're done. For example, a bounded scan could have
// been masked out as saturated, while an unbounded request that has
// completed could have been the reason for needing the next
// descriptor (needAnother=true); we now have rs.Key=KeyMax.
// key cannot be less that the end key.
if !rs.Key.Less(rs.EndKey) {
return br, nil, false
panic(fmt.Sprintf("start key %s is less than %s", rs.Key, rs.EndKey))
}

log.Trace(ctx, "querying next range")
Expand Down

0 comments on commit c217fce

Please sign in to comment.