Skip to content

Commit

Permalink
Address PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Feb 26, 2020
1 parent e76f621 commit b8c4db4
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/m3ninx/index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ func (e *BatchPartialError) Add(err BatchError) {

// AddWithLock adds an error to e with a lock. Any nil errors are ignored.
func (e *BatchPartialError) AddWithLock(err BatchError) {
e.Lock()
defer e.Unlock()
if err.Err == nil {
return
}
e.Lock()
e.errs = append(e.errs, err)
e.Unlock()
}

// Errs returns the errors with the indexes of the documents in the batch
Expand Down
28 changes: 17 additions & 11 deletions src/m3ninx/index/segment/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package builder
import (
"errors"
"fmt"
"runtime"
"sync"

"github.com/m3db/m3/src/m3ninx/doc"
Expand All @@ -42,10 +41,12 @@ var (

const (
// Slightly buffer the work to avoid blocking main thread.
indexQueueSize = 2 << 7
indexQueueSize = 2 << 9 // 1024
)

type indexJob struct {
wg *sync.WaitGroup

id postings.ID
field doc.Field

Expand Down Expand Up @@ -75,7 +76,7 @@ type builder struct {
// not thread safe and is optimized for insertion speed and a
// final build step when documents are indexed.
func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) {
concurrency := runtime.NumCPU()
concurrency := opts.Concurrency()
b := &builder{
opts: opts,
newUUIDFn: opts.NewUUIDFn(),
Expand Down Expand Up @@ -148,6 +149,7 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) {
func (b *builder) InsertBatch(batch index.Batch) error {
// NB(r): This is all kept in a single method to make the
// insertion path fast.
var wg sync.WaitGroup
batchErr := index.NewBatchPartialError()
for i, d := range batch.Docs {
// Validate doc
Expand Down Expand Up @@ -188,9 +190,9 @@ func (b *builder) InsertBatch(batch index.Batch) error {

// Index the terms.
for _, f := range d.Fields {
b.index(postings.ID(postingsListID), f, i, batchErr)
b.index(&wg, postings.ID(postingsListID), f, i, batchErr)
}
b.index(postings.ID(postingsListID), doc.Field{
b.index(&wg, postings.ID(postingsListID), doc.Field{
Name: doc.IDReservedFieldName,
Value: d.ID,
}, i, batchErr)
Expand All @@ -206,6 +208,7 @@ func (b *builder) InsertBatch(batch index.Batch) error {
}

func (b *builder) index(
wg *sync.WaitGroup,
id postings.ID,
f doc.Field,
i int,
Expand All @@ -215,11 +218,12 @@ func (b *builder) index(
if b.closed.Load() {
return
}
b.wg.Add(1)
wg.Add(1)
// NB(bodu): To avoid locking inside of the terms, we shard the work
// by field name.
shard := int(xxhash.Sum64(f.Name) % uint64(len(b.indexQueues)))
b.indexQueues[shard] <- indexJob{
wg: wg,
id: id,
field: f,
shard: shard,
Expand All @@ -244,13 +248,15 @@ func (b *builder) indexWorker(indexQueue chan indexJob) {
// collection for correct response when retrieving all fields.
newField := terms.size() == 0
// NB(bodu): Bulk of the cpu time during insertion is spent inside of terms.post().
if err := terms.post(job.field.Value, job.id); err != nil {
err := terms.post(job.field.Value, job.id)
if err == nil {
if newField {
b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name)
}
} else {
job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx})
}
if newField {
b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name)
}
b.wg.Done()
job.wg.Done()
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/m3ninx/index/segment/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ var (

func TestBuilderFields(t *testing.T) {
builder, err := NewBuilderFromDocuments(testOptions)
defer builder.Close()
require.NoError(t, err)
defer require.NoError(t, builder.Close())

for i := 0; i < 10; i++ {
builder.Reset(0)
Expand All @@ -106,8 +106,8 @@ func TestBuilderFields(t *testing.T) {

func TestBuilderTerms(t *testing.T) {
builder, err := NewBuilderFromDocuments(testOptions)
defer builder.Close()
require.NoError(t, err)
defer require.NoError(t, builder.Close())

for i := 0; i < 10; i++ {
builder.Reset(0)
Expand Down
26 changes: 13 additions & 13 deletions src/m3ninx/index/segment/builder/bytes_slice_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type OrderedBytesSliceIter struct {

currentIdx int
current []byte
backingSlices *sortableSliceOfSliceOfByteSlices
backingSlices *sortableSliceOfSliceOfByteSlicesAsc
}

var _ segment.FieldsIterator = &OrderedBytesSliceIter{}
Expand All @@ -46,7 +46,7 @@ var _ segment.FieldsIterator = &OrderedBytesSliceIter{}
func NewOrderedBytesSliceIter(
maybeUnorderedSlices [][][]byte,
) *OrderedBytesSliceIter {
sortable := &sortableSliceOfSliceOfByteSlices{data: maybeUnorderedSlices}
sortable := &sortableSliceOfSliceOfByteSlicesAsc{data: maybeUnorderedSlices}
sorts.ByBytes(sortable)
return &OrderedBytesSliceIter{
currentIdx: -1,
Expand Down Expand Up @@ -90,43 +90,43 @@ func (b *OrderedBytesSliceIter) Close() error {
return nil
}

type sortableSliceOfSliceOfByteSlices struct {
type sortableSliceOfSliceOfByteSlicesAsc struct {
data [][][]byte
length *int
length int
}

func (s *sortableSliceOfSliceOfByteSlices) Len() int {
if s.length != nil {
return *s.length
func (s *sortableSliceOfSliceOfByteSlicesAsc) Len() int {
if s.length > 0 {
return s.length
}

totalLen := 0
for _, innerSlice := range s.data {
totalLen += len(innerSlice)
}
s.length = &totalLen
s.length = totalLen

return *s.length
return s.length
}

func (s *sortableSliceOfSliceOfByteSlices) Less(i, j int) bool {
func (s *sortableSliceOfSliceOfByteSlicesAsc) Less(i, j int) bool {
iOuter, iInner := s.getIndices(i)
jOuter, jInner := s.getIndices(j)
return bytes.Compare(s.data[iOuter][iInner], s.data[jOuter][jInner]) < 0
}

func (s *sortableSliceOfSliceOfByteSlices) Swap(i, j int) {
func (s *sortableSliceOfSliceOfByteSlicesAsc) Swap(i, j int) {
iOuter, iInner := s.getIndices(i)
jOuter, jInner := s.getIndices(j)
s.data[iOuter][iInner], s.data[jOuter][jInner] = s.data[jOuter][jInner], s.data[iOuter][iInner]
}

func (s *sortableSliceOfSliceOfByteSlices) Key(i int) []byte {
func (s *sortableSliceOfSliceOfByteSlicesAsc) Key(i int) []byte {
iOuter, iInner := s.getIndices(i)
return s.data[iOuter][iInner]
}

func (s *sortableSliceOfSliceOfByteSlices) getIndices(idx int) (int, int) {
func (s *sortableSliceOfSliceOfByteSlicesAsc) getIndices(idx int) (int, int) {
currentSliceIdx := 0
for idx >= len(s.data[currentSliceIdx]) {
idx -= len(s.data[currentSliceIdx])
Expand Down
24 changes: 24 additions & 0 deletions src/m3ninx/index/segment/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package builder

import (
"runtime"

"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/postings/roaring"
"github.com/m3db/m3/src/m3ninx/util"
Expand All @@ -30,6 +32,10 @@ const (
defaultInitialCapacity = 128
)

var (
defaultConcurrency = runtime.NumCPU()
)

// Options is a collection of options for segment building.
type Options interface {
// SetNewUUIDFn sets the function used to generate new UUIDs.
Expand All @@ -49,12 +55,19 @@ type Options interface {

// PostingsListPool returns the postings list pool.
PostingsListPool() postings.Pool

// SetConcurrency sets the indexing concurrency.
SetConcurrency(value int) Options

// Concurrency returns the indexing concurrency.
Concurrency() int
}

type opts struct {
newUUIDFn util.NewUUIDFn
initialCapacity int
postingsPool postings.Pool
concurrency int
}

// NewOptions returns new options.
Expand All @@ -63,6 +76,7 @@ func NewOptions() Options {
newUUIDFn: util.NewUUID,
initialCapacity: defaultInitialCapacity,
postingsPool: postings.NewPool(nil, roaring.NewPostingsList),
concurrency: defaultConcurrency,
}
}

Expand Down Expand Up @@ -95,3 +109,13 @@ func (o *opts) SetPostingsListPool(v postings.Pool) Options {
func (o *opts) PostingsListPool() postings.Pool {
return o.postingsPool
}

func (o *opts) SetConcurrency(v int) Options {
opts := *o
opts.concurrency = v
return &opts
}

func (o *opts) Concurrency() int {
return o.concurrency
}
5 changes: 2 additions & 3 deletions src/m3ninx/index/segment/builder/sharded_fields_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func newShardedFieldsMap(

func (s *shardedFieldsMap) Get(k []byte) (*terms, bool) {
for _, fieldMap := range s.data {
t, found := fieldMap.Get(k)
if found {
if t, found := fieldMap.Get(k); found {
return t, found
}
}
Expand All @@ -65,7 +64,7 @@ func (s *shardedFieldsMap) ShardedSetUnsafe(
s.data[shard].SetUnsafe(k, v, opts)
}

// ResetTerms keeps fields around but resets the terms set for each one.
// ResetTermsSets keeps fields around but resets the terms set for each one.
func (s *shardedFieldsMap) ResetTermsSets() {
for _, fieldMap := range s.data {
for _, entry := range fieldMap.Iter() {
Expand Down

0 comments on commit b8c4db4

Please sign in to comment.