Skip to content

Commit 4883980

Browse files
committed
fixed
1 parent 9c09e15 commit 4883980

File tree

11 files changed

+1267
-1126
lines changed

11 files changed

+1267
-1126
lines changed

posting/index.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
10211021
return nil, err
10221022
}
10231023

1024-
memoryLayer.del(key)
1024+
MemLayerInstance.del(key)
10251025
return &bpb.KVList{Kv: kvs}, nil
10261026
}
10271027
tmpStream.Send = func(buf *z.Buffer) error {

posting/index_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func addEdgeToUID(t *testing.T, attr string, src uint64,
261261

262262
func TestCountReverseIndexWithData(t *testing.T) {
263263
require.NoError(t, pstore.DropAll())
264-
memoryLayer.clear()
264+
MemLayerInstance.clear()
265265
indexNameCountVal := "testcount: [uid] @count @reverse ."
266266

267267
attr := x.AttrInRootNamespace("testcount")
@@ -296,7 +296,7 @@ func TestCountReverseIndexWithData(t *testing.T) {
296296

297297
func TestCountReverseIndexEmptyPosting(t *testing.T) {
298298
require.NoError(t, pstore.DropAll())
299-
memoryLayer.clear()
299+
MemLayerInstance.clear()
300300
indexNameCountVal := "testcount: [uid] @count @reverse ."
301301

302302
attr := x.AttrInRootNamespace("testcount")

posting/list_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ func TestReadSingleValue(t *testing.T) {
524524
require.NoError(t, err)
525525
require.NoError(t, writePostingListToDisk(kvs))
526526
// Delete item from global cache before reading, as we are not updating the cache in the test
527-
memoryLayer.del(key)
527+
MemLayerInstance.del(key)
528528
ol, err = getNew(key, ps, math.MaxUint64, false)
529529
require.NoError(t, err)
530530
}

posting/lists.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func Init(ps *badger.DB, cacheSize int64, removeOnUpdate bool) {
3939
closer = z.NewCloser(1)
4040
go x.MonitorMemoryMetrics(closer)
4141

42-
memoryLayer = initMemoryLayer(cacheSize, removeOnUpdate)
42+
MemLayerInstance = initMemoryLayer(cacheSize, removeOnUpdate)
4343
}
4444

4545
func SetEnabledDetailedMetrics(enableMetrics bool) {

posting/mvcc.go

Lines changed: 103 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ var (
7272
}
7373
)
7474

75-
var memoryLayer *MemoryLayer
75+
var MemLayerInstance *MemoryLayer
7676

7777
func init() {
7878
x.AssertTrue(len(IncrRollup.priorityKeys) == 2)
@@ -325,12 +325,12 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
325325
}
326326

327327
func ResetCache() {
328-
memoryLayer.clear()
328+
MemLayerInstance.clear()
329329
}
330330

331331
// RemoveCacheFor will delete the list corresponding to the given key.
332332
func RemoveCacheFor(key []byte) {
333-
memoryLayer.del(key)
333+
MemLayerInstance.del(key)
334334
}
335335

336336
type Cache struct {
@@ -405,8 +405,104 @@ func (ml *MemoryLayer) del(key []byte) {
405405
ml.cache.del(key)
406406
}
407407

408+
type IterateDiskFunc struct {
409+
Prefix []byte
410+
Prefetch bool
411+
AllVersions bool
412+
ReadTs uint64
413+
Reverse bool
414+
CheckInclusion func(uint64) error
415+
Function func(l *List, pk x.ParsedKey) error
416+
417+
StartKey []byte
418+
}
419+
420+
func (ml *MemoryLayer) IterateDisk(ctx context.Context, f IterateDiskFunc) error {
421+
txn := pstore.NewTransactionAt(f.ReadTs, false)
422+
defer txn.Discard()
423+
424+
itOpt := badger.DefaultIteratorOptions
425+
itOpt.PrefetchValues = f.Prefetch
426+
itOpt.AllVersions = f.AllVersions
427+
itOpt.Reverse = f.Reverse
428+
itOpt.Prefix = f.Prefix
429+
it := txn.NewIterator(itOpt)
430+
defer it.Close()
431+
432+
var prevKey []byte
433+
434+
count := 0
435+
436+
for it.Seek(f.StartKey); it.Valid(); {
437+
item := it.Item()
438+
if bytes.Equal(item.Key(), prevKey) {
439+
it.Next()
440+
continue
441+
}
442+
prevKey = append(prevKey[:0], item.Key()...)
443+
444+
// Parse the key upfront, otherwise ReadPostingList would advance the
445+
// iterator.
446+
pk, err := x.Parse(item.Key())
447+
if err != nil {
448+
return err
449+
}
450+
451+
if pk.HasStartUid {
452+
// The keys holding parts of a split key should not be accessed here because
453+
// they have a different prefix. However, the check is being added to guard
454+
// against future bugs.
455+
continue
456+
}
457+
458+
if item.UserMeta()&BitEmptyPosting > 0 {
459+
// This is an empty posting list. So, it should not be included.
460+
continue
461+
}
462+
463+
err = f.CheckInclusion(pk.Uid)
464+
switch {
465+
case err == ErrNoValue:
466+
continue
467+
case err != nil:
468+
return err
469+
}
470+
471+
count++
472+
473+
if count%100000 == 0 {
474+
select {
475+
case <-ctx.Done():
476+
return ctx.Err()
477+
default:
478+
}
479+
}
480+
481+
l, err := ReadPostingList(item.KeyCopy(nil), it)
482+
if err != nil {
483+
return err
484+
}
485+
empty, err := l.IsEmpty(f.ReadTs, 0)
486+
switch {
487+
case err != nil:
488+
return err
489+
case !empty:
490+
err = f.Function(l, pk)
491+
if err != nil && err != ErrStopIteration {
492+
return err
493+
}
494+
if err == ErrStopIteration {
495+
return nil
496+
}
497+
}
498+
499+
it.Next()
500+
}
501+
return nil
502+
}
503+
408504
func GetStatsHolder() *StatsHolder {
409-
return memoryLayer.statsHolder
505+
return MemLayerInstance.statsHolder
410506
}
411507

412508
func initMemoryLayer(cacheSize int64, removeOnUpdate bool) *MemoryLayer {
@@ -510,9 +606,9 @@ func (txn *Txn) UpdateCachedKeys(commitTs uint64) {
510606
return
511607
}
512608

513-
memoryLayer.wait()
609+
MemLayerInstance.wait()
514610
for key, delta := range txn.cache.deltas {
515-
memoryLayer.updateItemInCache(key, delta, txn.StartTs, commitTs)
611+
MemLayerInstance.updateItemInCache(key, delta, txn.StartTs, commitTs)
516612
}
517613
}
518614

@@ -746,7 +842,7 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64, readUids bool) (*List,
746842
return nil, badger.ErrDBClosed
747843
}
748844

749-
l, err := memoryLayer.ReadData(key, pstore, readTs, readUids)
845+
l, err := MemLayerInstance.ReadData(key, pstore, readTs, readUids)
750846
if err != nil {
751847
return l, err
752848
}

posting/mvcc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func TestPostingListRead(t *testing.T) {
201201
require.NoError(t, writer.SetAt(key, []byte{}, BitEmptyPosting, 6))
202202
require.NoError(t, writer.Flush())
203203
// Delete the key from cache as we have just updated it
204-
memoryLayer.del(key)
204+
MemLayerInstance.del(key)
205205
assertLength(7, 0)
206206

207207
addEdgeToUID(t, attr, 1, 4, 7, 8)
@@ -214,7 +214,7 @@ func TestPostingListRead(t *testing.T) {
214214
writer = NewTxnWriter(pstore)
215215
require.NoError(t, writer.SetAt(key, data, BitCompletePosting, 10))
216216
require.NoError(t, writer.Flush())
217-
memoryLayer.del(key)
217+
MemLayerInstance.del(key)
218218
assertLength(10, 0)
219219

220220
addEdgeToUID(t, attr, 1, 5, 11, 12)

protos/pb.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ message Query {
6161
// field. Now, It's been used only for has query.
6262
int32 offset = 16; // offset helps in fetching lesser results for the has query when there is
6363
// no filter and order.
64+
65+
SortMessage order = 17;
6466
}
6567

6668
message ValueList {

0 commit comments

Comments
 (0)