Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Concurrent time series indexing within a single batch #2146

Merged
merged 11 commits into from Mar 5, 2020

Conversation

notbdu
Copy link
Contributor

@notbdu notbdu commented Feb 13, 2020

What this PR does / why we need it:
Improves indexing perf.

Special notes for your reviewer:

Does this PR introduce a user-facing and/or backwards incompatible change?:

None

Does this PR require updating code package or user-facing documentation?:

None

@notbdu
Copy link
Contributor Author

notbdu commented Feb 13, 2020

Screen Shot 2020-02-12 at 9 22 12 PM

After <-> Before cpu flame graphs

)

var (
errDocNotFound = errors.New("doc not found")
)

const (
// Slightly buffer the work to avoid blocking main thread.
indexQueueSize = 2 << 7
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, yeah perhaps we even increase this to 1024?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Also I think I've seen the actual number usually present as a comment, i.e. indexQueueSize = 2 << 7 // 128

NoFinalizeKey: true,
})
}
b.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually should be ok to avoid locking here yeah if we're sharding by name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought that the go runtime would complain about concurrent map access even if we were accessing diff keys. Lemme try w/o.

job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx})
}
if newField {
b.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess can't avoid this lock however since unique fields are shared. We could always create a uniqueFields per worker though and iterate over a list of lists possible in the NewOrderedBytesSliceIter iterator (would need a special implementation of sort.Interface to handle the fact it's a slice of slices).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added sharding for both unique fields and fields map. Removed all locks w/in the index worker.

@codecov
Copy link

codecov bot commented Feb 14, 2020

Codecov Report

Merging #2146 into master will increase coverage by 0.8%.
The diff coverage is 81.6%.

Impacted file tree graph

@@           Coverage Diff            @@
##           master   #2146     +/-   ##
========================================
+ Coverage    71.4%   72.2%   +0.8%     
========================================
  Files        1018    1019      +1     
  Lines       88346   88430     +84     
========================================
+ Hits        63085   63890    +805     
+ Misses      20940   20245    -695     
+ Partials     4321    4295     -26
Flag Coverage Δ
#aggregator 82% <ø> (ø) ⬆️
#cluster 85.2% <ø> (-0.2%) ⬇️
#collector 82.8% <ø> (ø) ⬆️
#dbnode 79% <38.4%> (+2%) ⬆️
#m3em 74.4% <ø> (ø) ⬆️
#m3ninx 74.6% <86%> (+0.3%) ⬆️
#m3nsch 51.1% <ø> (ø) ⬆️
#metrics 17.6% <ø> (ø) ⬆️
#msg 74.9% <ø> (ø) ⬆️
#query 68.1% <ø> (ø) ⬆️
#x 83.4% <ø> (+0.2%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 987db51...ac934c2. Read the comment docs.

Comment on lines 146 to 150
e.Lock()
defer e.Unlock()
if err.Err == nil {
return
}
e.errs = append(e.errs, err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can probably be a little bit more efficient as:

if err.Err == nil {
 return
}

e.Lock()
e.errs = append(e.errs, err)
e.Unlock()

Also, seems to only be used in one place; might be more straightforward to do the locking there rather than adding a mutex on the struct? Take it or leave it though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just moved the locking code around.

@@ -78,6 +78,7 @@ var (

func TestBuilderFields(t *testing.T) {
builder, err := NewBuilderFromDocuments(testOptions)
defer builder.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a test, instead of deferring close here, maybe add require.NoError(t, builder.Close()) at the end of the test?

@@ -105,6 +106,7 @@ func TestBuilderFields(t *testing.T) {

func TestBuilderTerms(t *testing.T) {
builder, err := NewBuilderFromDocuments(testOptions)
defer builder.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a test, instead of deferring close here, maybe add require.NoError(t, builder.Close()) at the end of the test?

data []*fieldsMap
}

func newShardedFieldsMap(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: may be a good idea to add sanity checks for shardInitialCapacity and numShards

Comment on lines 44 to 46
t, found := fieldMap.Get(k)
if found {
return t, found
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe more go-ish as

if t, found := fieldMap.Get(k); found {
 return t, true
}

s.data[shard].SetUnsafe(k, v, opts)
}

// ResetTerms keeps fields around but resets the terms set for each one.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mismatch between comment and func name

@@ -25,29 +25,32 @@ import (
"sort"

"github.com/m3db/m3/src/m3ninx/index/segment"

"github.com/twotwotwo/sorts"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is BSD3, pretty sure that has Apache2 compat, but just looking to doublecheck? (Also, think it may need to be added to our glide file?)

edit: we're good

@@ -86,6 +90,51 @@ func (b *OrderedBytesSliceIter) Close() error {
return nil
}

type sortableSliceOfSliceOfByteSlices struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: think we typically postpend this with the direction (Asc or Desc)

func NewBuilderFromDocuments(opts Options) (segment.DocumentsBuilder, error) {
return &builder{
func NewBuilderFromDocuments(opts Options) (segment.CloseableDocumentsBuilder, error) {
concurrency := runtime.NumCPU()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this come from Options and default to runtime.NumCPU()?

if shardInitialCapcity > 0 {
shardInitialCapcity /= concurrency
}
shardUniqueFields := make([][]byte, 0, shardInitialCapcity)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be more performant if you allocate the entire [][]byte as a full block, then index into it; i.e.

shardUniqueFields := make([][]byte, 0, concurrency * shardInitialCapcity)
for i := 0; i < concurrency; i++ {
 //...
 b.uniqueFields = append(b.uniqueFields, shardUniqueFields[i*concurrency:(i+1)*concurrency])
 // ...
}

Copy link
Contributor Author

@notbdu notbdu Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be able to grow a shard in this case. Or it would be difficult to do so.

}
shardUniqueFields := make([][]byte, 0, shardInitialCapcity)
b.uniqueFields = append(b.uniqueFields, shardUniqueFields)
b.fields = newShardedFieldsMap(concurrency, shardInitialCapcity)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be possible to bulk-allocate these similarly to b.uniqueFields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as before. Growing a shard would be very painful.

job.batchErr.AddWithLock(index.BatchError{Err: err, Idx: job.idx})
}
if newField {
b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this happen if the post failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good call, moving around the logic here.

if newField {
b.uniqueFields[job.shard] = append(b.uniqueFields[job.shard], job.field.Name)
}
b.wg.Done()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be misreading this, but it looks like there's a disconnect between the wg.Add and the wg.Done, where a single wait group may be used by multiple inserts and it's not clear that they complete on time? Maybe rather than adding a wait group on builder, may be better to explicitly init one and pass it into index and add it to the indexJob?

@robskillington robskillington changed the title Concurrent indexing. [dbnode] Concurrent batch time series indexing Feb 28, 2020
@robskillington robskillington changed the title [dbnode] Concurrent batch time series indexing [dbnode] Concurrent time series indexing within a single batch Feb 28, 2020
@@ -748,6 +748,7 @@ func (i *nsIndex) Flush(

builderOpts := i.opts.IndexOptions().SegmentBuilderOptions()
builder, err := builder.NewBuilderFromDocuments(builderOpts)
defer builder.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to move this after the err != nil { return err } since if err != nil then builder will be nil and calling builder.Close() will cause a nil ptr panic.

})
// Free segment builder resources.
if b.compact.segmentBuilder != nil {
b.compact.segmentBuilder.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to check the error returned from segment builder too here and emit an invariant error if cannot close.


"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index"
"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/m3ninx/util"
"go.uber.org/atomic"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This should be grouped next to other third party imports such as "github.com/cespare/xxhash"


func (s *shardedFieldsMap) Get(k []byte) (*terms, bool) {
for _, fieldMap := range s.data {
if t, found := fieldMap.Get(k); found {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, can we perhaps just hash the k to work out which map to find?

Otherwise we have to do 32 map accesses (on 32 core machine) vs checking just 1 map. This is done for every single field during a compaction to.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just make the Get(k) take a shard too, i.e. Get(shard int, k []byte)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I already have a ShardedGet method. The (b *builder) Terms(field []byte) call doesn't already have shard pre-computed so I added this regular Get. But you are right it looks like it's cheaper to compute the hash vs multiple map accesses.

Here are some benchmark results:

notbdu @ Bos-MacBook-Pro.local m3 (bdu/concurrent-index) $ go test -v -bench . ./bench_test.go
goos: darwin
goarch: amd64
BenchmarkMapAccess-8    20000000                83.9 ns/op
BenchmarkHashing-8      200000000                6.21 ns/op

@@ -751,6 +751,7 @@ func (i *nsIndex) Flush(
if err != nil {
return err
}
defer builder.Close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

go b.indexWorker(indexQueue)

// Give each shard a fraction of the configured initial capacity.
shardInitialCapcity := opts.InitialCapacity()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shardInitialCapcity should be shardInitialCapacity?

@@ -236,7 +298,9 @@ func (b *builder) Fields() (segment.FieldsIterator, error) {
}

func (b *builder) Terms(field []byte) (segment.TermsIterator, error) {
terms, ok := b.fields.Get(field)
// NB(bodu): The # of indexQueues and field map shards are equal.
shard := int(xxhash.Sum64(field) % uint64(len(b.indexQueues)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a method on the builder to reuse this code? I see it replicated on this line and line 230 i.e.

func (b *builder) shardForField(field []byte) {
  return int(xxhash.Sum64(field) % uint64(len(b.indexQueues)))
}

Copy link
Collaborator

@robskillington robskillington left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM other than remaining comments

@notbdu notbdu merged commit 7228d90 into master Mar 5, 2020
@notbdu notbdu deleted the bdu/concurrent-index branch March 5, 2020 07:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants