-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability for query storage to provide unconsolidated blocks #929
Conversation
src/query/block/column.go
Outdated
@@ -36,6 +36,11 @@ type columnBlock struct { | |||
seriesMeta []SeriesMeta | |||
} | |||
|
|||
// Unconsolidated returns the unconlidated version for the block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/unconlidated/unconsolidated
src/query/block/column.go
Outdated
@@ -36,6 +36,11 @@ type columnBlock struct { | |||
seriesMeta []SeriesMeta | |||
} | |||
|
|||
// Unconsolidated returns the unconlidated version for the block | |||
func (c *columnBlock) Unconsolidated() (UnconsolidatedBlock, error) { | |||
return nil, fmt.Errorf("unconslidated block not support for block, meta: %s", c.meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reword error and check spelling
src/query/block/scalar.go
Outdated
@@ -47,6 +47,11 @@ func NewScalar(val float64, bounds models.Bounds) Block { | |||
} | |||
} | |||
|
|||
// Unconsolidated returns the unconlidated version for the block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comments as above
src/query/block/series.go
Outdated
return len(s.datapoints) | ||
} | ||
|
||
// Consolidated conslidates the series |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/conslidates/consolidates
src/query/block/types.go
Outdated
} | ||
|
||
// Type assertion | ||
var _ ConsolidationFunc = TakeLast |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this needed for exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to ensure TakeLast actually implements ConsolidationFunc. I guess i can remove it since multiSeriesBlock Consolidate will throw a compilation error if TakeLast doesn't implement it
src/query/executor/transform/lazy.go
Outdated
@@ -145,6 +145,23 @@ type lazyBlock struct { | |||
processedBlock block.Block | |||
} | |||
|
|||
// Unconsolidated returns the unconlidated version for the block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/unconlidated/unconsolidated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/for the block/of the block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use "for the block" in most other places.
src/query/executor/transform/lazy.go
Outdated
return f.processedBlock.Unconsolidated() | ||
} | ||
|
||
err := f.process() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err := f.process(); err != nil {
return nil, err
}
sum := 0.0 | ||
for _, n := range f { | ||
for _, n := range vals { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
src/query/storage/consolidate.go
Outdated
} | ||
|
||
func (c *consolidatedBlock) Unconsolidated() (block.UnconsolidatedBlock, error) { | ||
return nil, fmt.Errorf("unconsolidated blocks are not supported") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errors.New("unconsolidated blocks are not supported")
src/query/storage/consolidate.go
Outdated
return c.unconsolidated.Close() | ||
} | ||
|
||
type consolidateStepIter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consolidatedStepIter
?
src/query/storage/consolidate.go
Outdated
return c.unconsolidated.Meta() | ||
} | ||
|
||
type consolidateSeriesIter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consolidatedSeriesIter
?
src/query/block/scalar.go
Outdated
@@ -47,6 +47,11 @@ func NewScalar(val float64, bounds models.Bounds) Block { | |||
} | |||
} | |||
|
|||
// Unconsolidated returns the unconlidated version for the block | |||
func (b *Scalar) Unconsolidated() (UnconsolidatedBlock, error) { | |||
return nil, fmt.Errorf("unconslidated block not support for block, meta: %s", b.meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be a notImplemented error, since scalars should be fine to have unconsolidated
return UnconsolidatedSeries{datapoints: datapoints, Meta: meta} | ||
} | ||
|
||
// DatapointsAtStep returns the raw datapoints at a step index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: DatapointAtStep returns the raw datapoint at a step index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's returning multiple datapoints ?
|
||
// DatapointsAtStep returns the raw datapoints at a step index | ||
func (s UnconsolidatedSeries) DatapointsAtStep(idx int) ts.Datapoints { | ||
return s.datapoints[idx] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should perform bound checks
) | ||
|
||
// Block represents a group of series across a time bound | ||
type Block interface { | ||
// Unconsolidated returns the unconsolidated version of the block | ||
Unconsolidated() (UnconsolidatedBlock, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rather than error it could return a bool to indicate if it can be unconsolidated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even for things which can support unconsolidated, you can still get an error. So we need an error here neverthless. I like this over having a bool so that clients don't check 2 things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit weird; you may want to continue and do something different if you can't consolidate because you errored vs your block being unable to block, but that could be changed later if necessary
func seriesValuesToDatapoints(values []float64, bounds models.Bounds) ts.Datapoints { | ||
dps := make(ts.Datapoints, len(values)) | ||
for i, v := range values { | ||
t, _ := bounds.TimeForIndex(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need the second arg here otherwise you'll have a nil timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was assuming we'd just provide values within bounds for tests. Didn't want each test to check for error. A better approach i suppose it for tests to pass in testing.T object but that requires too many changes and didn't look very useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad didn't catch these were test utilities
src/query/block/types.go
Outdated
// SeriesMeta returns the metadata for each series in the block | ||
SeriesMeta() []SeriesMeta | ||
// Meta returns the metadata for the block | ||
Meta() Metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can roll these into a single interface and share that with StepIter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, might be a good idea to pull
// SeriesMeta returns the metadata for each series in the block
SeriesMeta() []SeriesMeta
// Meta returns the metadata for the block
Meta() Metadata
into a single interface since a lot of these use them
dps := seriesValuesToDatapoints(values, bounds) | ||
seriesList[i] = ts.NewSeries(meta[i].Name, dps, meta[i].Tags) | ||
} | ||
b, _ := storage.NewMultiSeriesBlock(seriesList, &storage.FetchQuery{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
src/query/storage/consolidate.go
Outdated
* THE SOFTWARE. | ||
*/ | ||
|
||
package storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to consolidated.go
src/query/storage/consolidate.go
Outdated
return c.unconsolidated.Close() | ||
} | ||
|
||
type consolidateStepIter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to consolidatedStepIter
src/query/storage/consolidate.go
Outdated
return c.unconsolidated.Meta() | ||
} | ||
|
||
type consolidateSeriesIter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to consolidatedSeriesIter
for i, values := range seriesValues { | ||
dps := seriesValuesToDatapoints(values, bounds) | ||
seriesList[i] = ts.NewSeries(meta[i].Name, dps, meta[i].Tags) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
src/query/ts/values.go
Outdated
|
||
// Aligned returns values aligned to given bounds. | ||
func (d Datapoints) Aligned(bounds models.Bounds) []Datapoints { | ||
numDatapoints := len(d) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d.Len()
?
src/query/ts/values.go
Outdated
} | ||
|
||
// Aligned returns values aligned to given bounds. | ||
func (d Datapoints) Aligned(bounds models.Bounds) []Datapoints { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to Align
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arnikola wanted it AlignToBounds. Is that better ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Early bird gets the worm @benraskin92 😜
4ec7dcb
to
7b7972c
Compare
Codecov Report
@@ Coverage Diff @@
## master #929 +/- ##
===========================================
- Coverage 77.82% 55.88% -21.95%
===========================================
Files 411 408 -3
Lines 34516 34377 -139
===========================================
- Hits 26863 19212 -7651
- Misses 5778 13374 +7596
+ Partials 1875 1791 -84
Continue to review full report at Codecov.
|
src/query/block/column.go
Outdated
@@ -36,6 +36,11 @@ type columnBlock struct { | |||
seriesMeta []SeriesMeta | |||
} | |||
|
|||
// Unconsolidated returns the unconsolidated version for the block | |||
func (c *columnBlock) Unconsolidated() (UnconsolidatedBlock, error) { | |||
return nil, fmt.Errorf("unconsolidated view not support for block, meta: %s", c.meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/support/supported
func (m multiSeriesBlock) Consolidate() (block.Block, error) { | ||
return &consolidatedBlock{ | ||
unconsolidated: m, | ||
consolidationFunc: block.TakeLast, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll use a different consolidation func in the future, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prometheus only has gauges so TakeLast is only reasonable option. Yeah, we would probably have different once we have different types.
b5230be
to
2ad6e95
Compare
@@ -47,6 +47,11 @@ func NewScalar(val float64, bounds models.Bounds) Block { | |||
} | |||
} | |||
|
|||
// Unconsolidated returns the unconsolidated version for the block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit ...of the block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ben had the same comment. We use "for the block" almost everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's grammatically correct almost everywhere, just not here 😛
src/query/block/types.go
Outdated
return math.NaN() | ||
} | ||
|
||
return values[len(values)-1].Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might need to be a little more complex; rather than taking the last value it should take the last non-NaN value
src/query/server/server.go
Outdated
select { | ||
case <-sigChan: | ||
case <-interruptCh: | ||
var interruptErr error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might need to rebase?
func (b *fixedResolutionValues) AlignToBounds(_ models.Bounds) []Datapoints { | ||
values := make([]Datapoints, len(b.values)) | ||
for i := 0; i < b.Len(); i++ { | ||
values[i] = Datapoints{b.DatapointAt(i)} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
stepSize := bound.StepSize()
...
Datapoints{
Timestamp: bound.Start().Add(time.Duration(i) * stepSize)
Value: b.DatapointAt(i)
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little cleaner since Datapoint already does this logic
} | ||
|
||
// Consolidated consolidates the series | ||
func (s UnconsolidatedSeries) Consolidated(consolidationFunc ConsolidationFunc) Series { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to Consolidate
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's just talking about a view. Maybe we already have it cached and don't really consolidate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like Consolidate
@@ -36,6 +36,11 @@ type columnBlock struct { | |||
seriesMeta []SeriesMeta | |||
} | |||
|
|||
// Unconsolidated returns the unconsolidated version for the block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: of the block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use for the block at most places. Can do a search and replace in a separate diff
return math.NaN() | ||
} | ||
|
||
allNaNs := true | ||
result := 0.0 | ||
prev := values[0] | ||
prev := datapoints[0].Value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any chance these can be nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we already have a length check ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean datapoints[0] could be nil, so you might hit NPE when you call .Value
func seriesValuesToDatapoints(values []float64, bounds models.Bounds) ts.Datapoints { | ||
dps := make(ts.Datapoints, len(values)) | ||
for i, v := range values { | ||
t, _ := bounds.TimeForIndex(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad didn't catch these were test utilities
"github.com/m3db/m3/src/query/block" | ||
) | ||
|
||
type consolidatedBlock struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consolidatedBlock is a bit of a weird name for something containing an unconsolidated block and a consolidation block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't have consolidation block just a consolidatationFunc
src/query/ts/values.go
Outdated
dpIdx := 0 | ||
for i := 0; i < steps; i++ { | ||
startIdx := dpIdx | ||
t, _ := bounds.TimeForIndex(i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then might be a bit better to do the calculation here explicitly to avoid the overhead of the bounds checking in .TimeForIndex()?
@@ -20,6 +20,10 @@ | |||
|
|||
package block | |||
|
|||
import ( | |||
"github.com/m3db/m3/src/query/ts" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use "github.com/m3db/m3/src/dbnode/ts"
and delete this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we want to avoid any dbnode things especially coupling their series representation with ours. We'd probably move anything m3db related within ts/m3db and localize it there
b9bc69e
to
a373c73
Compare
// TakeLast is a consolidation function which takes the last datapoint which has non nan value | ||
func TakeLast(values ts.Datapoints) float64 { | ||
for i := len(values) - 1; i >= 0; i-- { | ||
if !math.IsNaN(values[i].Value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
m.consolidated = consolidated | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
|
||
// AlignToBounds returns values aligned to given bounds. | ||
func (d Datapoints) AlignToBounds(bounds models.Bounds) []Datapoints { | ||
numDatapoints := d.Len() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put these in a var()
?
var (
numDatapoints = d.Len()
steps = bounds.Steps()
stepValues = make([]Datapoints, steps)
dpIdx = 0
stepSize = bounds.StepSize
t = bounds.Start
)
dpIdx := 0 | ||
stepSize := bounds.StepSize | ||
t := bounds.Start | ||
for i := 0; i < steps; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
@@ -98,6 +138,17 @@ func (b *fixedResolutionValues) DatapointAt(point int) Datapoint { | |||
} | |||
} | |||
|
|||
// AlignToBounds returns values aligned to given bounds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of weird to call this AlignToBounds
, but you just disregard the bounds..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple nits, but LGTM
src/query/storage/consolidated.go
Outdated
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
* THE SOFTWARE. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This license text comment should be //
prefixed per line like the other .go
files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran this on a preprod host and the missing metrics we were seeing on master are resolved :)
Fix lint Remove unused mock Revert "Remove unused mock" This reverts commit 2ad6e95. Update mock
a373c73
to
1320a86
Compare
No description provided.