Skip to content

Commit

Permalink
Address PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Feb 28, 2020
1 parent f8a84d8 commit 77ffa29
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,10 +748,10 @@ func (i *nsIndex) Flush(

builderOpts := i.opts.IndexOptions().SegmentBuilderOptions()
builder, err := builder.NewBuilderFromDocuments(builderOpts)
defer builder.Close()
if err != nil {
return err
}
defer builder.Close()

var evicted int
for _, block := range flushable {
Expand Down
6 changes: 5 additions & 1 deletion src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,11 @@ func (b *block) cleanupForegroundCompactWithLock() {

// Free segment builder resources.
if b.compact.segmentBuilder != nil {
b.compact.segmentBuilder.Close()
if err := b.compact.segmentBuilder.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) {
l.Error("error closing index block segment builder", zap.Error(err))
})
}
b.compact.segmentBuilder = nil
}
}
Expand Down
29 changes: 20 additions & 9 deletions src/m3ninx/index/segment/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import (
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/util"
"go.uber.org/atomic"

"github.com/cespare/xxhash"
)

var (
errDocNotFound = errors.New("doc not found")
errClosed = errors.New("builder closed")
)

const (
Expand All @@ -55,6 +55,11 @@ type indexJob struct {
batchErr *index.BatchPartialError
}

type builderStatus struct {
sync.RWMutex
closed bool
}

type builder struct {
opts Options
newUUIDFn util.NewUUIDFn
Expand All @@ -68,7 +73,7 @@ type builder struct {
uniqueFields [][][]byte

indexQueues []chan indexJob
closed *atomic.Bool
status builderStatus
}

// NewBuilderFromDocuments returns a builder from documents, it is
Expand All @@ -87,7 +92,6 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e
}),
uniqueFields: make([][][]byte, 0, concurrency),
indexQueues: make([]chan indexJob, 0, concurrency),
closed: atomic.NewBool(false),
}

for i := 0; i < concurrency; i++ {
Expand Down Expand Up @@ -146,6 +150,13 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) {
}

func (b *builder) InsertBatch(batch index.Batch) error {
b.status.RLock()
defer b.status.RUnlock()

if b.status.closed {
return errClosed
}

// NB(r): This is all kept in a single method to make the
// insertion path fast.
var wg sync.WaitGroup
Expand Down Expand Up @@ -213,10 +224,6 @@ func (b *builder) index(
i int,
batchErr *index.BatchPartialError,
) {
// Do-nothing if we are already closed to avoid send on closed panics.
if b.closed.Load() {
return
}
wg.Add(1)
// NB(bodu): To avoid locking inside of the terms, we shard the work
// by field name.
Expand Down Expand Up @@ -291,7 +298,9 @@ func (b *builder) Fields() (segment.FieldsIterator, error) {
}

func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {
terms, ok := b.fields.Get(field)
// NB(bodu): The # of indexQueues and field map shards are equal.
shard := int(xxhash.Sum64(field) % uint64(len(b.indexQueues)))
terms, ok := b.fields.ShardedGet(shard, field)
if !ok {
return nil, fmt.Errorf("field not found: %s", string(field))
}
Expand All @@ -304,9 +313,11 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {
}

func (b *builder) Close() error {
b.closed.Store(true)
b.status.Lock()
defer b.status.Unlock()
for _, q := range b.indexQueues {
close(q)
}
b.status.closed = true
return nil
}
9 changes: 0 additions & 9 deletions src/m3ninx/index/segment/builder/sharded_fields_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ func newShardedFieldsMap(
}
}

func (s *shardedFieldsMap) Get(k []byte) (*terms, bool) {
for _, fieldMap := range s.data {
if t, found := fieldMap.Get(k); found {
return t, found
}
}
return nil, false
}

func (s *shardedFieldsMap) ShardedGet(
shard int,
k []byte,
Expand Down

0 comments on commit 77ffa29

Please sign in to comment.