Skip to content

Commit

Permalink
gh-1832 [ci skip] WIP: sort at runtime, then merge
Browse files Browse the repository at this point in the history
This is meant as a first step to then later be able to change the
storage in a way that it is already sorted and we can remove the runtime
sort step
  • Loading branch information
etiennedi committed Feb 25, 2022
1 parent 019e190 commit 3441675
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 18 deletions.
2 changes: 1 addition & 1 deletion adapters/repos/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewIndex(ctx context.Context, config IndexConfig,
cs inverted.ClassSearcher, logger logrus.FieldLogger,
nodeResolver nodeResolver, remoteClient sharding.RemoteIndexClient) (*Index, error) {
// TODO: can't error with hard-coded preset, needs checking once configurable
sd, _ := stopwords.NewDetectorFromPreset("")
sd, _ := stopwords.NewDetectorFromPreset("en")
index := &Index{
Config: config,
Shards: map[string]*Shard{},
Expand Down
11 changes: 9 additions & 2 deletions adapters/repos/db/inverted/bm25_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func (b *BM25Searcher) Object(ctx context.Context, limit int,
idLists[i] = ids
}

before := time.Now()
ids := newScoreMerger(idLists).do()
fmt.Printf("merge scores took %s\n", time.Since(before))

sort.Slice(ids.docIDs, func(a, b int) bool {
return ids.docIDs[a].score > ids.docIDs[b].score
Expand All @@ -99,17 +101,22 @@ func (b *BM25Searcher) Object(ctx context.Context, limit int,

func (b *BM25Searcher) retrieveScoreAndSortForSingleTerm(ctx context.Context,
property, term string) (docPointersWithScore, error) {
before := time.Now()
ids, err := b.getIdsWithFrequenciesForTerm(ctx, property, term)
if err != nil {
return docPointersWithScore{}, errors.Wrap(err,
"read doc ids and their frequencies from inverted index")
}
fmt.Printf("term %q: get ids took %s\n", term, time.Since(before))
fmt.Printf("term %q: %d ids\n", term, len(ids.docIDs))

before = time.Now()
if err := b.score(ids, property); err != nil {
return docPointersWithScore{}, err
}
fmt.Printf("term %q: score ids took %s\n", term, time.Since(before))

before := time.Now()
before = time.Now()
// TODO: this runtime sorting is only because the storage is not implemented
// in an always sorted manner. Once we have that implemented, we can skip
// this expensive runtime-sort
Expand All @@ -118,7 +125,7 @@ func (b *BM25Searcher) retrieveScoreAndSortForSingleTerm(ctx context.Context,
})

// TODO: structured logging
fmt.Printf("TEMP DEBUG: sorting by doc ids took %s\n", time.Since(before))
fmt.Printf("term %q: sorting by doc ids took %s\n", term, time.Since(before))

return ids, nil
}
Expand Down
39 changes: 27 additions & 12 deletions adapters/repos/db/lsmkv/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
package lsmkv

import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -255,20 +257,17 @@ func (b *Bucket) MapList(key []byte, cfgs ...MapListOption) ([]MapPair, error) {
cfg(&c)
}

var raw []value

v, err := b.disk.getCollection(key)
before := time.Now()
segments, err := b.disk.getCollectionBySegments(key)
if err != nil {
if err != nil && err != NotFound {
return nil, err
}
}
fmt.Printf("--map-list: get all disk segments took %s\n", time.Since(before))

if len(raw) > 0 {
raw = append(raw, v...)
} else {
raw = v
}
before = time.Now()
fmt.Printf("--map-list: apend all disk segments took %s\n", time.Since(before))

if b.flushing != nil {
v, err := b.flushing.getCollection(key)
Expand All @@ -277,18 +276,34 @@ func (b *Bucket) MapList(key []byte, cfgs ...MapListOption) ([]MapPair, error) {
return nil, err
}
}
raw = append(raw, v...)
segments = append(segments, v)
}

v, err = b.active.getCollection(key)
before = time.Now()
v, err := b.active.getCollection(key)
if err != nil {
if err != nil && err != NotFound {
return nil, err
}
}
raw = append(raw, v...)
segments = append(segments, v)
fmt.Printf("--map-list: get all active segments took %s\n", time.Since(before))

before = time.Now()
for i := range segments {
sort.Slice(segments[i], func(a, b int) bool {
return bytes.Compare(segments[i][a].value, segments[i][b].value) == -1
})
}
fmt.Printf("--map-list: sort all segments took %s\n", time.Since(before))

before = time.Now()
defer func() {
fmt.Printf("--map-list: run decoder took %s\n", time.Since(before))
}()

return newMapDecoder().Do(raw, c.acceptDuplicates)
return nil, nil
// return newMapDecoder().Do(raw, c.acceptDuplicates)
}

func (b *Bucket) MapSet(rowKey []byte, kv MapPair) error {
Expand Down
25 changes: 25 additions & 0 deletions adapters/repos/db/lsmkv/segment_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,31 @@ func (ig *SegmentGroup) getCollection(key []byte) ([]value, error) {
return out, nil
}

func (ig *SegmentGroup) getCollectionBySegments(key []byte) ([][]value, error) {
ig.maintenanceLock.RLock()
defer ig.maintenanceLock.RUnlock()

out := make([][]value, len(ig.segments))

i := 0
// start with first and do not exit
for _, segment := range ig.segments {
v, err := segment.getCollection(key)
if err != nil {
if err == NotFound {
continue
}

return nil, err
}

out[i] = v
i++
}

return out[:i], nil
}

func (ig *SegmentGroup) count() int {
ig.maintenanceLock.RLock()
defer ig.maintenanceLock.RUnlock()
Expand Down
6 changes: 3 additions & 3 deletions adapters/repos/db/lsmkv/strategies_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func newMapDecoder() *mapDecoder {
}

func (m *mapDecoder) Do(in []value, acceptDuplicates bool) ([]MapPair, error) {
if acceptDuplicates {
return m.doSimplified(in)
}
// if acceptDuplicates {
// return m.doSimplified(in)
// }

seenKeys := map[string]uint{}
kvs := make([]MapPair, len(in))
Expand Down

0 comments on commit 3441675

Please sign in to comment.