Skip to content

Commit

Permalink
fix: Ensure Index.Walk fetches matching foreign keys only
Browse files Browse the repository at this point in the history
This commit modifies the behaviour of the indexWalk function to ensure
it parses the key parts and matches the foreign key exactly.

Closes #20096
  • Loading branch information
stuartcarnie committed Nov 22, 2020
1 parent 668db04 commit e6a5707
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 8 deletions.
18 changes: 12 additions & 6 deletions kv/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ func (i *Index) sourceBucket(tx Tx) (Bucket, error) {
return tx.Bucket(i.SourceBucket())
}

func indexKey(foreignKey, primaryKey []byte) (newKey []byte) {
// IndexKey returns a value suitable for use as the key component
// when storing values in the index.
func IndexKey(foreignKey, primaryKey []byte) (newKey []byte) {
newKey = make([]byte, len(primaryKey)+len(foreignKey)+1)
copy(newKey, foreignKey)
newKey[len(foreignKey)] = '/'
Expand Down Expand Up @@ -157,7 +159,7 @@ func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error {
return err
}

return bkt.Put(indexKey(foreignKey, primaryKey), primaryKey)
return bkt.Put(IndexKey(foreignKey, primaryKey), primaryKey)
}

// Delete removes the foreignKey and primaryKey mapping from the underlying index.
Expand All @@ -167,7 +169,7 @@ func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error {
return err
}

return bkt.Delete(indexKey(foreignKey, primaryKey))
return bkt.Delete(IndexKey(foreignKey, primaryKey))
}

// Walk walks the source bucket using keys found in the index using the provided foreign key
Expand Down Expand Up @@ -195,16 +197,20 @@ func (i *Index) Walk(ctx context.Context, tx Tx, foreignKey []byte, visitFn Visi
return err
}

return indexWalk(ctx, cursor, sourceBucket, visitFn)
return indexWalk(ctx, foreignKey, cursor, sourceBucket, visitFn)
}

// indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their
// associated primaryKey's value in the provided source bucket.
// When an item is located in the source, the provided visit function is called with primary key and associated value.
func indexWalk(ctx context.Context, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
func indexWalk(ctx context.Context, foreignKey []byte, indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) {
var keys [][]byte
for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() {
keys = append(keys, pk)
if fk, _, err := indexKeyParts(ik); err != nil {
return err
} else if string(fk) == string(foreignKey) {
keys = append(keys, pk)
}
}

if err := indexCursor.Err(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions kv/index_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (i *IndexMigration) Populate(ctx context.Context, store Store) (n int, err

for fk, fkm := range diff.MissingFromIndex {
for pk := range fkm {
batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)})
batch = append(batch, [2][]byte{IndexKey([]byte(fk), []byte(pk)), []byte(pk)})

if len(batch) >= i.operationBatchSize {
if err := flush(batch); err != nil {
Expand Down Expand Up @@ -183,7 +183,7 @@ func (i *IndexMigration) remove(ctx context.Context, store Store, mappings map[s

for fk, fkm := range mappings {
for pk := range fkm {
batch = append(batch, indexKey([]byte(fk), []byte(pk)))
batch = append(batch, IndexKey([]byte(fk), []byte(pk)))

if len(batch) >= i.operationBatchSize {
if err := flush(batch); err != nil {
Expand Down
101 changes: 101 additions & 0 deletions kv/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import (
"os"
"testing"

"github.com/golang/mock/gomock"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/kv"
"github.com/influxdata/influxdb/v2/kv/mock"
influxdbtesting "github.com/influxdata/influxdb/v2/testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)

Expand All @@ -33,6 +38,102 @@ func Test_Bolt_Index(t *testing.T) {
influxdbtesting.TestIndex(t, s)
}

func TestIndex_Walk(t *testing.T) {
t.Run("only selects exact keys", func(t *testing.T) {
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)

type keyValue struct{ key, val []byte }
makeIndexKV := func(fk, pk string) keyValue {
return keyValue{
key: kv.IndexKey([]byte(fk), []byte(pk)),
val: []byte(pk),
}
}

makeKV := func(key, val string) keyValue {
return keyValue{[]byte(key), []byte(val)}
}

var (
sourceBucket = []byte("source")
indexBucket = []byte("index")
foreignKey = []byte("jenkins")
idxkeyvals = []keyValue{
makeIndexKV("jenkins-aws", "pk1"),
makeIndexKV("jenkins-aws", "pk2"),
makeIndexKV("jenkins-aws", "pk3"),
makeIndexKV("jenkins", "pk4"),
makeIndexKV("jenkins", "pk5"),
}
srckeyvals = []struct{ key, val []byte }{
makeKV("pk4", "val4"),
makeKV("pk5", "val5"),
}
)

mapping := kv.NewIndexMapping(sourceBucket, indexBucket, func(data []byte) ([]byte, error) {
return nil, nil
})

tx := mock.NewMockTx(ctrl)

src := mock.NewMockBucket(ctrl)
src.EXPECT().
GetBatch(srckeyvals[0].key, srckeyvals[1].key).
Return([][]byte{srckeyvals[0].val, srckeyvals[1].val}, nil)

tx.EXPECT().
Bucket(sourceBucket).
Return(src, nil)

idx := mock.NewMockBucket(ctrl)
tx.EXPECT().
Bucket(indexBucket).
Return(idx, nil)

cur := mock.NewMockForwardCursor(ctrl)

i := 0
cur.EXPECT().
Next().
DoAndReturn(func() ([]byte, []byte) {
var k, v []byte
if i < len(idxkeyvals) {
elem := idxkeyvals[i]
i++
k, v = elem.key, elem.val
}

return k, v
}).
Times(len(idxkeyvals) + 1)
cur.EXPECT().
Err().
Return(nil)
cur.EXPECT().
Close().
Return(nil)
idx.EXPECT().
ForwardCursor(foreignKey, gomock.Any()).
Return(cur, nil)

ctx := context.Background()
index := kv.NewIndex(mapping, kv.WithIndexReadPathEnabled)

j := 0
err := index.Walk(ctx, tx, foreignKey, func(k, v []byte) (bool, error) {
require.Less(t, j, len(srckeyvals))
assert.Equal(t, srckeyvals[j].key, k)
assert.Equal(t, srckeyvals[j].val, v)
j++
return true, nil
})

assert.NoError(t, err)
})
}

func Benchmark_Inmem_Index_Walk(b *testing.B) {
influxdbtesting.BenchmarkIndexWalk(b, inmem.NewKVStore(), 1000, 200)
}
Expand Down
135 changes: 135 additions & 0 deletions kv/mock/bucket.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e6a5707

Please sign in to comment.