Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Make use of the upperBound of ticlient's kv_scan interface to ensure no overbound scan will happen (#8081) #8310

Merged
merged 6 commits into from Nov 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion ddl/column.go
Expand Up @@ -14,6 +14,7 @@
package ddl

import (
"math"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -304,7 +305,7 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
for {
startTime := time.Now()
handles = handles[:0]
err = iterateSnapshotRows(d.store, t, version, seekHandle,
err = iterateSnapshotRows(d.store, t, version, seekHandle, math.MaxInt64, true,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
handles = append(handles, h)
if len(handles) == defaultBatchCnt {
Expand Down
8 changes: 2 additions & 6 deletions ddl/delete_range.go
Expand Up @@ -14,7 +14,6 @@
package ddl

import (
"bytes"
"encoding/hex"
"fmt"
"math"
Expand Down Expand Up @@ -155,7 +154,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
finish := true
dr.keys = dr.keys[:0]
err := kv.RunInNewTxn(dr.d.store, false, func(txn kv.Transaction) error {
iter, err := txn.Iter(oldStartKey, nil)
iter, err := txn.Iter(oldStartKey, r.EndKey)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -165,10 +164,7 @@ func (dr *delRange) doTask(ctx sessionctx.Context, r util.DelRangeTask) error {
if !iter.Valid() {
break
}
finish = bytes.Compare(iter.Key(), r.EndKey) >= 0
if finish {
break
}
finish = false
dr.keys = append(dr.keys, iter.Key().Clone())
newStartKey = iter.Key().Next()

Expand Down
22 changes: 18 additions & 4 deletions ddl/index.go
Expand Up @@ -550,7 +550,7 @@ func (w *addIndexWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgInde
// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := time.Now()
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle,
err := iterateSnapshotRows(w.sessCtx.GetStore(), w.table, txn.StartTS(), taskRange.startHandle, taskRange.endHandle, taskRange.endIncluded,
func(handle int64, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
w.logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotRows in fetchRowColVals", 0)
Expand Down Expand Up @@ -1000,16 +1000,30 @@ func allocateIndexID(tblInfo *model.TableInfo) int64 {
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
func iterateSnapshotRows(store kv.Storage, t table.Table, version uint64, startHandle int64, endHandle int64, endIncluded bool, fn recordIterFunc) error {
ver := kv.Version{Ver: version}

snap, err := store.GetSnapshot(ver)
snap.SetPriority(kv.PriorityLow)
if err != nil {
return errors.Trace(err)
}
firstKey := t.RecordKey(seekHandle)
it, err := snap.Iter(firstKey, nil)
firstKey := t.RecordKey(startHandle)

// Calculate the exclusive upper bound
var upperBound kv.Key
if endIncluded {
if endHandle == math.MaxInt64 {
upperBound = t.RecordKey(endHandle).PrefixNext()
} else {
// PrefixNext is time costing. Try to avoid it if possible.
upperBound = t.RecordKey(endHandle + 1)
}
} else {
upperBound = t.RecordKey(endHandle)
}

it, err := snap.Iter(firstKey, upperBound)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Expand Up @@ -196,7 +196,7 @@ func (d *ddl) getReorgInfo(t *meta.Meta, job *model.Job, tbl table.Table) (*reor
}

// Get the first handle of this table.
err = iterateSnapshotRows(d.store, tbl, ver.Ver, math.MinInt64,
err = iterateSnapshotRows(d.store, tbl, ver.Ver, math.MinInt64, math.MaxInt64, true,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
info.Handle = h
return false, nil
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/scan.go
Expand Up @@ -96,6 +96,12 @@ func (s *Scanner) Next() error {
continue
}
}
current := s.cache[s.idx]
if len(s.endKey) > 0 && kv.Key(current.Key).Cmp(kv.Key(s.endKey)) >= 0 {
s.eof = true
s.Close()
return nil
}
if err := s.resolveCurrentLock(bo); err != nil {
s.Close()
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion structure/hash.go
Expand Up @@ -238,7 +238,7 @@ func (t *TxStructure) HClear(key []byte) error {

func (t *TxStructure) iterateHash(key []byte, fn func(k []byte, v []byte) error) error {
dataPrefix := t.hashDataKeyPrefix(key)
it, err := t.reader.Iter(dataPrefix, nil)
it, err := t.reader.Iter(dataPrefix, dataPrefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 5 additions & 3 deletions table/tables/index.go
Expand Up @@ -232,7 +232,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, m kv.Mutator, indexedValues

// Drop removes the KV index from store.
func (c *index) Drop(rm kv.RetrieverMutator) error {
it, err := rm.Iter(c.prefix, nil)
it, err := rm.Iter(c.prefix, c.prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -261,7 +261,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues
if err != nil {
return nil, false, errors.Trace(err)
}
it, err := r.Iter(key, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(key, upperBound)
if err != nil {
return nil, false, errors.Trace(err)
}
Expand All @@ -275,7 +276,8 @@ func (c *index) Seek(sc *stmtctx.StatementContext, r kv.Retriever, indexedValues

// SeekFirst returns an iterator which points to the first entry of the KV index.
func (c *index) SeekFirst(r kv.Retriever) (iter table.IndexIterator, err error) {
it, err := r.Iter(c.prefix, nil)
upperBound := c.prefix.PrefixNext()
it, err := r.Iter(c.prefix, upperBound)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions table/tables/tables.go
Expand Up @@ -701,7 +701,8 @@ func (t *Table) buildIndexForRow(ctx sessionctx.Context, rm kv.RetrieverMutator,
// IterRecords implements table.Table IterRecords interface.
func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
it, err := ctx.Txn().Iter(startKey, nil)
prefix := t.RecordPrefix()
it, err := ctx.Txn().Iter(startKey, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand All @@ -717,7 +718,6 @@ func (t *Table) IterRecords(ctx sessionctx.Context, startKey kv.Key, cols []*tab
for _, col := range cols {
colMap[col.ID] = &col.FieldType
}
prefix := t.RecordPrefix()
defaultVals := make([]types.Datum, len(cols))
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
Expand Down Expand Up @@ -831,7 +831,7 @@ func (t *Table) RebaseAutoID(ctx sessionctx.Context, newBase int64, isSetStep bo
// Seek implements table.Table Seek interface.
func (t *Table) Seek(ctx sessionctx.Context, h int64) (int64, bool, error) {
seekKey := tablecodec.EncodeRowKeyWithHandle(t.ID, h)
iter, err := ctx.Txn().Iter(seekKey, nil)
iter, err := ctx.Txn().Iter(seekKey, t.RecordPrefix().PrefixNext())
if !iter.Valid() || !iter.Key().HasPrefix(t.RecordPrefix()) {
// No more records in the table, skip to the end.
return 0, false, nil
Expand Down
6 changes: 4 additions & 2 deletions util/admin/admin.go
Expand Up @@ -541,7 +541,10 @@ func rowWithCols(sessCtx sessionctx.Context, txn kv.Retriever, t table.Table, h

func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Table, startKey kv.Key, cols []*table.Column,
fn table.RecordIterFunc) error {
it, err := retriever.Iter(startKey, nil)
prefix := t.RecordPrefix()
keyUpperBound := prefix.PrefixNext()

it, err := retriever.Iter(startKey, keyUpperBound)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -557,7 +560,6 @@ func iterRecords(sessCtx sessionctx.Context, retriever kv.Retriever, t table.Tab
for _, col := range cols {
colMap[col.ID] = &col.FieldType
}
prefix := t.RecordPrefix()
for it.Valid() && it.Key().HasPrefix(prefix) {
// first kv pair is row lock information.
// TODO: check valid lock
Expand Down
4 changes: 2 additions & 2 deletions util/prefix_helper.go
Expand Up @@ -26,7 +26,7 @@ import (

// ScanMetaWithPrefix scans metadata with the prefix.
func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Key, []byte) bool) error {
iter, err := retriever.Iter(prefix, nil)
iter, err := retriever.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -56,7 +56,7 @@ func ScanMetaWithPrefix(retriever kv.Retriever, prefix kv.Key, filter func(kv.Ke
// DelKeyWithPrefix deletes keys with prefix.
func DelKeyWithPrefix(rm kv.RetrieverMutator, prefix kv.Key) error {
var keys []kv.Key
iter, err := rm.Iter(prefix, nil)
iter, err := rm.Iter(prefix, prefix.PrefixNext())
if err != nil {
return errors.Trace(err)
}
Expand Down