From 77ffa29670535459e5dd0f126a2164424ac5ad58 Mon Sep 17 00:00:00 2001 From: Bo Du Date: Fri, 28 Feb 2020 16:40:53 -0500 Subject: [PATCH] Address PR comments. --- src/dbnode/storage/index.go | 2 +- src/dbnode/storage/index/block.go | 6 +++- src/m3ninx/index/segment/builder/builder.go | 29 +++++++++++++------ .../segment/builder/sharded_fields_map.go | 9 ------ 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index bae698a096..97bb398a11 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -748,10 +748,10 @@ func (i *nsIndex) Flush( builderOpts := i.opts.IndexOptions().SegmentBuilderOptions() builder, err := builder.NewBuilderFromDocuments(builderOpts) - defer builder.Close() if err != nil { return err } + defer builder.Close() var evicted int for _, block := range flushable { diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index e156e9b66f..b0e90f13d5 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -717,7 +717,11 @@ func (b *block) cleanupForegroundCompactWithLock() { // Free segment builder resources. if b.compact.segmentBuilder != nil { - b.compact.segmentBuilder.Close() + if err := b.compact.segmentBuilder.Close(); err != nil { + instrument.EmitAndLogInvariantViolation(b.iopts, func(l *zap.Logger) { + l.Error("error closing index block segment builder", zap.Error(err)) + }) + } b.compact.segmentBuilder = nil } } diff --git a/src/m3ninx/index/segment/builder/builder.go b/src/m3ninx/index/segment/builder/builder.go index 891b28e7c8..5ab62e7f41 100644 --- a/src/m3ninx/index/segment/builder/builder.go +++ b/src/m3ninx/index/segment/builder/builder.go @@ -30,13 +30,13 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/util" - "go.uber.org/atomic" "github.com/cespare/xxhash" ) var ( errDocNotFound = errors.New("doc not found") + errClosed = errors.New("builder closed") ) const ( @@ -55,6 +55,11 @@ type indexJob struct { batchErr *index.BatchPartialError } +type builderStatus struct { + sync.RWMutex + closed bool +} + type builder struct { opts Options newUUIDFn util.NewUUIDFn @@ -68,7 +73,7 @@ type builder struct { uniqueFields [][][]byte indexQueues []chan indexJob - closed *atomic.Bool + status builderStatus } // NewBuilderFromDocuments returns a builder from documents, it is @@ -87,7 +92,6 @@ func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, e }), uniqueFields: make([][][]byte, 0, concurrency), indexQueues: make([]chan indexJob, 0, concurrency), - closed: atomic.NewBool(false), } for i := 0; i < concurrency; i++ { @@ -146,6 +150,13 @@ func (b *builder) Insert(d doc.Document) ([]byte, error) { } func (b *builder) InsertBatch(batch index.Batch) error { + b.status.RLock() + defer b.status.RUnlock() + + if b.status.closed { + return errClosed + } + // NB(r): This is all kept in a single method to make the // insertion path fast. var wg sync.WaitGroup @@ -213,10 +224,6 @@ func (b *builder) index( i int, batchErr *index.BatchPartialError, ) { - // Do-nothing if we are already closed to avoid send on closed panics. - if b.closed.Load() { - return - } wg.Add(1) // NB(bodu): To avoid locking inside of the terms, we shard the work // by field name. @@ -291,7 +298,9 @@ func (b *builder) Fields() (segment.FieldsIterator, error) { } func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { - terms, ok := b.fields.Get(field) + // NB(bodu): The # of indexQueues and field map shards are equal. + shard := int(xxhash.Sum64(field) % uint64(len(b.indexQueues))) + terms, ok := b.fields.ShardedGet(shard, field) if !ok { return nil, fmt.Errorf("field not found: %s", string(field)) } @@ -304,9 +313,11 @@ func (b *builder) Terms(field []byte) (segment.TermsIterator, error) { } func (b *builder) Close() error { - b.closed.Store(true) + b.status.Lock() + defer b.status.Unlock() for _, q := range b.indexQueues { close(q) } + b.status.closed = true return nil } diff --git a/src/m3ninx/index/segment/builder/sharded_fields_map.go b/src/m3ninx/index/segment/builder/sharded_fields_map.go index a493939a15..ffa452c94c 100644 --- a/src/m3ninx/index/segment/builder/sharded_fields_map.go +++ b/src/m3ninx/index/segment/builder/sharded_fields_map.go @@ -39,15 +39,6 @@ func newShardedFieldsMap( } } -func (s *shardedFieldsMap) Get(k []byte) (*terms, bool) { - for _, fieldMap := range s.data { - if t, found := fieldMap.Get(k); found { - return t, found - } - } - return nil, false -} - func (s *shardedFieldsMap) ShardedGet( shard int, k []byte,