Skip to content

Commit

Permalink
lightning: continue this region round and retry on next round when Ti…
Browse files Browse the repository at this point in the history
…KV is busy (#40278)

* finish

Signed-off-by: lance6716 <lance6716@gmail.com>

* fix some comments

Signed-off-by: lance6716 <lance6716@gmail.com>

* fix CI

Signed-off-by: lance6716 <lance6716@gmail.com>

* address comment

Signed-off-by: lance6716 <lance6716@gmail.com>

* fix errdoc

Signed-off-by: lance6716 <lance6716@gmail.com>

* revert a change

Signed-off-by: lance6716 <lance6716@gmail.com>

* add lightning debug log

Signed-off-by: lance6716 <lance6716@gmail.com>

* fix bug

Signed-off-by: lance6716 <lance6716@gmail.com>

* address comment

Signed-off-by: lance6716 <lance6716@gmail.com>

Signed-off-by: lance6716 <lance6716@gmail.com>
Co-authored-by: Weizhen Wang <wangweizhen@pingcap.com>
  • Loading branch information
lance6716 and hawkingrei committed Jan 12, 2023
1 parent 22b43ff commit aef752a
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 112 deletions.
26 changes: 26 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,32 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
return newDupDetectIter(ctx, e.db, e.keyAdapter, opts, e.duplicateDB, logger)
}

func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
opt := &pebble.IterOptions{
LowerBound: lowerBound,
UpperBound: upperBound,
}

iter := e.newKVIter(context.Background(), opt)
//nolint: errcheck
defer iter.Close()
// Needs seek to first because NewIter returns an iterator that is unpositioned
hasKey := iter.First()
if iter.Error() != nil {
return nil, nil, errors.Annotate(iter.Error(), "failed to read the first key")
}
if !hasKey {
return nil, nil, nil
}
firstKey := append([]byte{}, iter.Key()...)
iter.Last()
if iter.Error() != nil {
return nil, nil, errors.Annotate(iter.Error(), "failed to seek to the last key")
}
lastKey := append([]byte{}, iter.Key()...)
return firstKey, lastKey, nil
}

type sstMeta struct {
path string
minKey []byte
Expand Down
34 changes: 34 additions & 0 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,37 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
},
}), errorEngineClosed)
}

func TestGetFirstAndLastKey(t *testing.T) {
db, tmpPath := makePebbleDB(t, nil)
f := &Engine{
db: db,
sstDir: tmpPath,
}
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
err = db.Set([]byte("c"), []byte("c"), nil)
require.NoError(t, err)
err = db.Set([]byte("e"), []byte("e"), nil)
require.NoError(t, err)

first, last, err := f.getFirstAndLastKey(nil, nil)
require.NoError(t, err)
require.Equal(t, []byte("a"), first)
require.Equal(t, []byte("e"), last)

first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("d"))
require.NoError(t, err)
require.Equal(t, []byte("c"), first)
require.Equal(t, []byte("c"), last)

first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("f"))
require.NoError(t, err)
require.Equal(t, []byte("c"), first)
require.Equal(t, []byte("e"), last)

first, last, err = f.getFirstAndLastKey([]byte("y"), []byte("z"))
require.NoError(t, err)
require.Nil(t, first)
require.Nil(t, last)
}

0 comments on commit aef752a

Please sign in to comment.