Skip to content

perf(core): improve performance with has filter with order #9439

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
return nil, err
}

memoryLayer.del(key)
MemLayerInstance.del(key)
return &bpb.KVList{Kv: kvs}, nil
}
tmpStream.Send = func(buf *z.Buffer) error {
Expand Down
4 changes: 2 additions & 2 deletions posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func addEdgeToUID(t *testing.T, attr string, src uint64,

func TestCountReverseIndexWithData(t *testing.T) {
require.NoError(t, pstore.DropAll())
memoryLayer.clear()
MemLayerInstance.clear()
indexNameCountVal := "testcount: [uid] @count @reverse ."

attr := x.AttrInRootNamespace("testcount")
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestCountReverseIndexWithData(t *testing.T) {

func TestCountReverseIndexEmptyPosting(t *testing.T) {
require.NoError(t, pstore.DropAll())
memoryLayer.clear()
MemLayerInstance.clear()
indexNameCountVal := "testcount: [uid] @count @reverse ."

attr := x.AttrInRootNamespace("testcount")
Expand Down
2 changes: 1 addition & 1 deletion posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func TestReadSingleValue(t *testing.T) {
require.NoError(t, err)
require.NoError(t, writePostingListToDisk(kvs))
// Delete item from global cache before reading, as we are not updating the cache in the test
memoryLayer.del(key)
MemLayerInstance.del(key)
ol, err = getNew(key, ps, math.MaxUint64, false)
require.NoError(t, err)
}
Expand Down
2 changes: 1 addition & 1 deletion posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Init(ps *badger.DB, cacheSize int64, removeOnUpdate bool) {
closer = z.NewCloser(1)
go x.MonitorMemoryMetrics(closer)

memoryLayer = initMemoryLayer(cacheSize, removeOnUpdate)
MemLayerInstance = initMemoryLayer(cacheSize, removeOnUpdate)
}

func SetEnabledDetailedMetrics(enableMetrics bool) {
Expand Down
108 changes: 101 additions & 7 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ var (
}
)

var memoryLayer *MemoryLayer
var MemLayerInstance *MemoryLayer

func init() {
x.AssertTrue(len(IncrRollup.priorityKeys) == 2)
Expand Down Expand Up @@ -325,12 +325,12 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
}

func ResetCache() {
memoryLayer.clear()
MemLayerInstance.clear()
}

// RemoveCacheFor will delete the list corresponding to the given key.
func RemoveCacheFor(key []byte) {
memoryLayer.del(key)
MemLayerInstance.del(key)
}

type Cache struct {
Expand Down Expand Up @@ -405,8 +405,102 @@ func (ml *MemoryLayer) del(key []byte) {
ml.cache.del(key)
}

type IterateDiskArgs struct {
Prefix []byte
Prefetch bool
AllVersions bool
ReadTs uint64
Reverse bool
CheckInclusion func(uint64) error
Function func(l *List, pk x.ParsedKey) error

StartKey []byte
}

func (ml *MemoryLayer) IterateDisk(ctx context.Context, f IterateDiskArgs) error {
txn := pstore.NewTransactionAt(f.ReadTs, false)
defer txn.Discard()

itOpt := badger.DefaultIteratorOptions
itOpt.PrefetchValues = f.Prefetch
itOpt.AllVersions = f.AllVersions
itOpt.Reverse = f.Reverse
itOpt.Prefix = f.Prefix
it := txn.NewIterator(itOpt)
defer it.Close()

var prevKey []byte

count := 0

for it.Seek(f.StartKey); it.Valid(); {
item := it.Item()
if bytes.Equal(item.Key(), prevKey) {
it.Next()
continue
}
prevKey = append(prevKey[:0], item.Key()...)

// Parse the key upfront, otherwise ReadPostingList would advance the
// iterator.
pk, err := x.Parse(item.Key())
if err != nil {
return err
}

if pk.HasStartUid {
// The keys holding parts of a split key should not be accessed here because
// they have a different prefix. However, the check is being added to guard
// against future bugs.
continue
}

if item.UserMeta()&BitEmptyPosting > 0 {
// This is an empty posting list. So, it should not be included.
continue
}

err = f.CheckInclusion(pk.Uid)
switch {
case err == ErrNoValue:
continue
case err != nil:
return err
}

count++

if count%100000 == 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}

l, err := ReadPostingList(item.KeyCopy(nil), it)
if err != nil {
return err
}
empty, err := l.IsEmpty(f.ReadTs, 0)
switch {
case err != nil:
return err
case !empty:
err = f.Function(l, pk)
if err != nil && err != ErrStopIteration {
return err
}
if err == ErrStopIteration {
return nil
}
}
}
return nil
}

func GetStatsHolder() *StatsHolder {
return memoryLayer.statsHolder
return MemLayerInstance.statsHolder
}

func initMemoryLayer(cacheSize int64, removeOnUpdate bool) *MemoryLayer {
Expand Down Expand Up @@ -510,9 +604,9 @@ func (txn *Txn) UpdateCachedKeys(commitTs uint64) {
return
}

memoryLayer.wait()
MemLayerInstance.wait()
for key, delta := range txn.cache.deltas {
memoryLayer.updateItemInCache(key, delta, txn.StartTs, commitTs)
MemLayerInstance.updateItemInCache(key, delta, txn.StartTs, commitTs)
}
}

Expand Down Expand Up @@ -746,7 +840,7 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64, readUids bool) (*List,
return nil, badger.ErrDBClosed
}

l, err := memoryLayer.ReadData(key, pstore, readTs, readUids)
l, err := MemLayerInstance.ReadData(key, pstore, readTs, readUids)
if err != nil {
return l, err
}
Expand Down
4 changes: 2 additions & 2 deletions posting/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestPostingListRead(t *testing.T) {
require.NoError(t, writer.SetAt(key, []byte{}, BitEmptyPosting, 6))
require.NoError(t, writer.Flush())
// Delete the key from cache as we have just updated it
memoryLayer.del(key)
MemLayerInstance.del(key)
assertLength(7, 0)

addEdgeToUID(t, attr, 1, 4, 7, 8)
Expand All @@ -214,7 +214,7 @@ func TestPostingListRead(t *testing.T) {
writer = NewTxnWriter(pstore)
require.NoError(t, writer.SetAt(key, data, BitCompletePosting, 10))
require.NoError(t, writer.Flush())
memoryLayer.del(key)
MemLayerInstance.del(key)
assertLength(10, 0)

addEdgeToUID(t, attr, 1, 5, 11, 12)
Expand Down
5 changes: 4 additions & 1 deletion protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ message Query {
// field. Now, It's been used only for has query.
int32 offset = 16; // offset helps in fetching lesser results for the has query when there is
// no filter and order.

SortMessage order = 17; // Order of the query. It will be used to help reduce the amount of computation
// required to fetch the results.
}

message ValueList {
Expand Down Expand Up @@ -791,4 +794,4 @@ message TaskStatusResponse {
uint64 task_meta = 1;
}

// vim: expandtab sw=2 ts=2
// vim: expandtab sw=2 ts=2
Loading