-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Serialization now uses compressed tags when possible #745
Conversation
src/coordinator/storage/converter.go
Outdated
"fmt" | ||
"time" | ||
|
||
"github.com/m3db/m3coordinator/util/execution" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
src/coordinator/storage/converter.go
Outdated
|
||
const ( | ||
initRawFetchAllocSize = 32 | ||
workerPoolSize = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we want to make this config driven in the future? If so, maybe add a comment.
datapoints := make(ts.Datapoints, 0, initRawFetchAllocSize) | ||
for iter.Next() { | ||
dp, _, _ := iter.Current() | ||
datapoints = append(datapoints, ts.Datapoint{Timestamp: dp.Timestamp, Value: dp.Value}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're pre-allocating above, is it better to use an index. e.g.
for i := 0; iter.Next(); i++ {
dp, _, _ := iter.Current()
datapoints[i] = ts.Datapoint{Timestamp: dp.Timestamp, Value: dp.Value}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need it to be able to expand if initRawFetchAllocSize is too small
src/coordinator/storage/converter.go
Outdated
result *ts.Series | ||
} | ||
|
||
func (w *decompressRequest) Process(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a comment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprised the linter let me get away with that
Codecov Report
@@ Coverage Diff @@
## master #745 +/- ##
=========================================
- Coverage 79.43% 79.14% -0.3%
=========================================
Files 280 283 +3
Lines 26569 26844 +275
=========================================
+ Hits 21105 21245 +140
- Misses 4186 4313 +127
- Partials 1278 1286 +8
Continue to review full report at Codecov.
|
src/coordinator/storage/converter.go
Outdated
iterLength := seriesIterators.Len() | ||
|
||
seriesList := make([]*ts.Series, 0, iterLength) | ||
div, remainder := iterLength/workerPoolSize, iterLength%workerPoolSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic should probably not live here. Maybe consider moving this within execution.ExecuteParallel ?
src/coordinator/storage/converter.go
Outdated
iterLength := seriesIterators.Len() | ||
|
||
seriesList := make([]*ts.Series, 0, iterLength) | ||
div, remainder := iterLength/workerPoolSize, iterLength%workerPoolSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe push the responsibility of providing the workerpoolsize to the caller, that way we can eventually centrally control all these resources
src/coordinator/storage/converter.go
Outdated
seriesList := make([]*ts.Series, 0, iterLength) | ||
div, remainder := iterLength/workerPoolSize, iterLength%workerPoolSize | ||
var executionPools [][]execution.Request | ||
if remainder > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm hoping we can leverage a library for this instead of doing the math ourselves
@@ -40,6 +40,17 @@ func toTime(t int64) time.Time { | |||
return storage.TimestampToTime(t) | |||
} | |||
|
|||
func encodeTags(tagMap map[string]string) []*rpc.Tag { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just have []rpc.Tag ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required to conform to what protobuf expects
@@ -80,6 +91,15 @@ func DecodeFetchResult(_ context.Context, rpcSeries []*rpc.Series) ([]*ts.Series | |||
return tsSeries, nil | |||
} | |||
|
|||
func decodeTags(tags []*rpc.Tag) models.Tags { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just have []rpc.Tag ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required to conform to what protobuf expects
if iterPools == nil { | ||
return nil, false | ||
} | ||
encoder := iterPools.TagEncoder().Get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nil check ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That implies a failing in the iterPools, but I'll add a sanity check
} | ||
|
||
func tagsFromTagIterator(tagIter ident.TagIterator) ([]*rpc.Tag, error) { | ||
tags := make([]*rpc.Tag, 0, tagIter.Remaining()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid pointer slices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Required to conform to what protobuf expects
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we ever need to use this func elsewhere? Might be worth it to export it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlikely, as this casts directly to the rpc.Tag type which should only be used for cross DC stuff
|
||
var tags []*rpc.Tag | ||
var err error | ||
if !canCompress { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe not rely on an error to switch logic ? Ideally, if we have compressedDatapoints we should always have compressedTags ? Otherwise, its confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going the other way; if we can make compressedTags, we do, otherwise we use regular tags
compressedTags, canCompress := compressedTagsFromTagIteratorWithEncoder(tagIter, iterPools) | ||
|
||
var tags []*rpc.Tag | ||
var err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var (
tags []*rpc.Tag
err error
)
iterators encoding.SeriesIterators, | ||
iterPools encoding.IteratorPools, | ||
) (*rpc.FetchResult, error) { | ||
iters := iterators.Iters() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer iterators.Close()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's going to be the responsibility of the caller, since tags id and namespace bytes are shared between the iterators and *rpc.FetchResult.
func tagIteratorFromSeries(series *rpc.Series, iteratorPools encoding.IteratorPools) (ident.TagIterator, error) { | ||
var ( | ||
idPool ident.Pool | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline
var ( | ||
pooledIterators encoding.MutableSeriesIterators | ||
seriesIterators []encoding.SeriesIterator | ||
numSeries = len(rpcSeries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newline
"github.com/m3db/m3db/src/coordinator/util/logging" | ||
|
||
"github.com/golang/mock/gomock" | ||
"github.com/m3db/m3x/ident" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: importorder
|
||
results, err := SeriesIteratorsToFetchResult(ctx, iters, ident.StringID("strID")) | ||
assert.NoError(t, err) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove newline
|
||
"github.com/golang/mock/gomock" | ||
"github.com/m3db/m3x/ident" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
importorder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So eventually, we want both local and remote storages to return blocks. Is that going to be a separate diff ? Once we start only dealing with blocks, how much of this is going to be used.
src/coordinator/storage/converter.go
Outdated
return decompressSequentially(iterLength, iters, namespace) | ||
} | ||
|
||
var wg sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you extract this out into separate function and maybe call it decompressConcurrently ?
err error | ||
) | ||
|
||
if !canCompress { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its strange to make it optional. Clients might be simpler if compressed datapoints always correspond to compressedTags as well. Clients can then just always go one path instead of switching based on error
decoder := iterPools.TagDecoder().Get() | ||
decoder.Reset(checkedBytes) | ||
defer decoder.Close() | ||
return decoder.Duplicate(), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you comment about why duplicate ?
The meat of this will lead to returning encoding.SeriesIterators; these will be passed through the conversion function in #726 to return the blocks we need |
@@ -226,6 +289,18 @@ func tagIteratorFromCompressedTags(compressedTags []*rpc.CompressedTag, idPool i | |||
return tagIter | |||
} | |||
|
|||
func tagIteratorFromSeries(series *rpc.Series, iteratorPools encoding.IteratorPools) (ident.TagIterator, error) { | |||
compressedValues := series.GetCompressed() | |||
if len(compressedValues.GetCompressedTags()) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nil check for compressedValues ? Might be better to check for isSet ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call on nil check, for some reason didn't think .GetCompressed() could return nil
initialize.Do(initializeVars) | ||
|
||
// Attempt to decompress compressed tags first as this is the only scenario that is expected to fail | ||
tagIter, err := tagIteratorFromSeries(timeSeries, iteratorPools) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tagIteratorFromSeries can either decompress tags or create them using series.GetTags(). Should we only allow the first method ? This will enforce that if the series is compressed, then its tags will also always be compressed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily, since series can be compressed but not have compressed tags if no iterator pools are passed in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
No description provided.