-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Temporal function which can span across time boundaries #811
Conversation
Codecov Report
@@ Coverage Diff @@
## master #811 +/- ##
=========================================
- Coverage 78.41% 78.3% -0.11%
=========================================
Files 381 384 +3
Lines 32712 33060 +348
=========================================
+ Hits 25651 25889 +238
- Misses 5314 5390 +76
- Partials 1747 1781 +34
Continue to review full report at Codecov.
|
src/coordinator/block/types.go
Outdated
type Bounds struct { | ||
Start time.Time | ||
End time.Time | ||
Duration time.Duration | ||
StepSize time.Duration | ||
} | ||
|
||
// TimeForIndex returns the start time for a given index assuming a uniform step size | ||
func (b Bounds) TimeForIndex(idx int) (time.Time, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should return (time.Time, bool)
so instead of relying on the error if it's out of bounds, we rely on a true/false
src/coordinator/block/types.go
Outdated
start := b.Start.Add(blockDuration * time.Duration(n*multiplier)) | ||
return Bounds{ | ||
Start: start, | ||
Duration: blockDuration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can just do Duration: b.Duration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b.Duration was being used twice so i just extract it out
src/coordinator/block/types_test.go
Outdated
} | ||
assert.Equal(t, bounds.Steps(), 0) | ||
_, err := bounds.TimeForIndex(0) | ||
assert.Error(t, err, "No valid index in this block") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/No/no
src/coordinator/block/types_test.go
Outdated
} | ||
assert.Equal(t, bounds.Steps(), 0) | ||
_, err = bounds.TimeForIndex(0) | ||
assert.Error(t, err, "No valid index in this block") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
src/coordinator/block/types_test.go
Outdated
@@ -0,0 +1,71 @@ | |||
// Copyright (c) 2018 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to test Next()
and/or Previous()
defer c.mu.Unlock() | ||
_, ok := c.blocks[fromTime(key)] | ||
if ok { | ||
return errors.New("block already exists") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to error? If it's already in there, isn't that fine? If you need to, can we just return a bool
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a block already there is probably a bug so an error helps us identify it
if ok { | ||
blks[i] = b | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
src/coordinator/functions/fetch.go
Outdated
// Ignore any errors | ||
iter, _ := block.StepIter() | ||
if iter != nil { | ||
fmt.Printf("[fetch node]: meta for the block: %v\n", iter.Meta()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
???
transformOpts transform.Options | ||
} | ||
|
||
// Process processes a block. The processing steps are as follows: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice comment!
// 3. For the blocks after current block, figure out which can be processed right now | ||
// 4. Process all valid blocks from #3, #4 and mark them as processed | ||
// 5. Run a sweep face to free up blocks which are no longer needed to be cached | ||
func (c *baseNode) Process(ID parser.NodeID, b block.Block) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some comments in the code explaining what each section is doing? I think that'd be very helpful.
} | ||
|
||
builder.AppendValue(i, newVal) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove newline
values = append(values, s.Values()...) | ||
} | ||
|
||
desiredLength := int(aggDuration / bounds.StepSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add some comments in this code
deps := leftBlks[len(leftBlks)-lStart:] | ||
deps = append(deps, rightBlks[:i]...) | ||
processRequests = append(processRequests, processRequest{blk: rightBlks[i], deps: deps, bounds: bounds.Next(i + 1)}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove newline
src/coordinator/ts/series.go
Outdated
@@ -69,7 +69,7 @@ func alignValues(values Values, start, end time.Time, interval time.Duration) (F | |||
case Datapoints: | |||
return RawPointsToFixedStep(vals, start, end, interval) | |||
case FixedResolutionMutableValues: | |||
// TODO: Align fixed resolution as well once storages can return those directly | |||
// TODO: NearestStart fixed resolution as well once storages can return those directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Comment mishap?
src/coordinator/block/types.go
Outdated
type Bounds struct { | ||
Start time.Time | ||
End time.Time | ||
Duration time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: To mirror m3db, might be better to call this BlockSize
?
} | ||
|
||
rightRangeStart := bounds.Next(maxBlocks).Start | ||
queryEndBounds := bounds.Nearest(c.transformOpts.TimeSpec.End.Add(-1 * bounds.StepSize)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: End.Sub(bounds.StepSize)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the reason we do it is because Sub works with time whereas Add works with duration.
endInclusiveVal := r.FormValue(endInclusiveParam) | ||
params.IncludeEnd = true | ||
if endInclusiveVal != "" { | ||
includeEnd, err := strconv.ParseBool(r.FormValue(endInclusiveParam)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use endInclusiveVal
instead of getting it from r
again
if err != nil { | ||
logging.WithContext(r.Context()).Warn("unable to parse end inclusive flag", zap.Any("error", err)) | ||
} | ||
params.IncludeEnd = includeEnd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit newline
// Processed returns all processed block times from the cache | ||
func (c *TimeCache) Processed() map[time.Time]bool { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than incurring the cost of defer here, unlock manually
} | ||
|
||
// Process left side of the range | ||
leftBlks, emptyLeftBlocks := c.processLeft(b, bounds, maxBlocks, leftRangeStart) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this would read better if the bool returned by processLeft
is true
iff required blocks are present, rather than the inverse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simplified some logic now, let me know how it reads
leftRangeTimes = append(leftRangeTimes, t) | ||
} | ||
|
||
leftBlks := c.cache.MultiGet(leftRangeTimes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to push some logic down to MultiGet
, where it shortcircuits out on a missing cache value, then returns the partial list and false
, rather than doing additional processing here and in processRight
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah sounds good.
return rightBlks[:firstNil], firstNil != len(rightBlks) | ||
} | ||
|
||
// processCompletedBlocks processes all blocks for which are dependant blocks are present |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit ... for which all dependent...
// Mark all blocks as processed | ||
c.cache.MarkProcessed(processedKeys) | ||
// Sweep to free blocks from cache with no dependencies | ||
c.sweep(c.cache.Processed(), queryStartBounds, queryEndBounds, maxBlocks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does sweep need to take in cache.Processed()
if it's a method on c
already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keeping sweep simple ?
src/coordinator/models/tag.go
Outdated
@@ -214,3 +214,16 @@ func (t Tags) sortKeys() ([]string, int) { | |||
sort.Strings(keys) | |||
return keys, length | |||
} | |||
|
|||
func (t Tags) WithoutName() Tags { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer part of the diff
b64f8ae
to
d2da696
Compare
@@ -45,6 +45,7 @@ const ( | |||
targetParam = "target" | |||
stepParam = "step" | |||
debugParam = "debug" | |||
endInclusiveParam = "end-inclusive" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: since including is the default, flip this to excludeEndParam
?
@@ -150,6 +163,12 @@ func renderResultsJSON(w io.Writer, series []*ts.Series) { | |||
vals := s.Values() | |||
for i := 0; i < s.Len(); i++ { | |||
dp := vals.DatapointAt(i) | |||
// Skip points before the query boundary. Ideal place to adjust these would be at the result node but that would make it inefficient | |||
// since we would need to create another block just for the sake of restricting the bounds | |||
if dp.Timestamp.Before(params.Start) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than skipping in this loop, could ts.Series
keep the Bounds
of the block it was generated from, and then we could have something like this on Bounds:
// IndexAtTime returns the first index at or after a given time;
// returns 0 if t < start, and -1 if t > end
func (b Bounds) IndexAtTime(t time.Time) int {
start := b.Start
if t.Before(start) {
return 0
}
if t.After(b.End) {
return -1
}
return int(math.Ceil(float64(t.Sub(start)) / float64(b.StepSize)))
}
and use for i := s.b.IndexAtTime(params.Start)
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the series bounds are always same as the bounds of the block its generated from. The problem is that we sometimes have to fetch blocks for a longer query duration so that some functions can look back enough. However, when we return, we want to skip the excess points.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the motivation, but at the moment it may iterate through a bunch of unnecessary data points; better to calculate the first valid i
to start from, rather than trying a bunch and discarding. Not really a big deal with a couple of series, but something like 10,000 series where only the last few datapoints in a 2 hour block are required, we'll be doing a lot of unnecessary looping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I see. I can solve that another way. Essentially, since each series has the same start, I can figure out the actual start for first series and use that for all others.
} | ||
|
||
// Steps calculates the number of steps for the bounds | ||
func (b Bounds) Steps() int { | ||
if b.Start.After(b.End) || b.StepSize <= 0 { | ||
if b.StepSize <= 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the case where StepSize is 0 error out earlier, when we build the bounds in the first place, rather than continuing here which could give weird results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think only TimeForIndex doesn't make sense with 0 step size, other things should work. In theory we can use this for things without fixed resolution ? I was thinking about moving the bounds outside the block package and into models.
src/query/block/types.go
Outdated
|
||
// Contains returns whether the time lies between the bounds. | ||
func (b Bounds) Contains(t time.Time) bool { | ||
return !b.Start.After(t) && b.Start.Add(b.Duration).After(t) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might read a little cleaner as
diff := b.Start.Sub(t)
return diff >= 0 && diff < b.Duration
src/query/block/types.go
Outdated
} | ||
|
||
func (b Bounds) nth(n int, forward bool) Bounds { | ||
multiplier := 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: might read a little cleaner as
multiplier := time.Duration(n)
if !forward {
multiplier *= -1
}
src/query/functions/temporal/base.go
Outdated
// MarkProcessed is used to mark a block as processed | ||
func (c *blockCache) markProcessed(keys []time.Time) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlock manually instead
src/query/functions/temporal/base.go
Outdated
} | ||
} | ||
|
||
reversed(blks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use sort.Reverse(blks)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't want the blocks in descending sorted order, this is more about just reversing the array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sort.Reverse just reverses the list, shouldn't do any sorting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think that's correct. Check out : https://golang.org/pkg/sort/#Reverse. They have an example there. Reverse needs Len(), Comparator, etc. and sorts the list in reverse order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough; weird that there's no inbuilt reverse array method
src/query/functions/temporal/base.go
Outdated
|
||
// processCompletedBlocks processes all blocks for which are dependant blocks are present | ||
func (c *baseNode) processCompletedBlocks(processRequests []processRequest, queryStartBounds, queryEndBounds block.Bounds, maxBlocks int) error { | ||
processedKeys := make([]time.Time, len(processRequests)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this lock the mutex? Similar for processLeft and processRight
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
markProcessed and sweep are the only ones mutating the blocks. markProcessed is already within a mutex and i'm not sure if sweep needs it since its fine for sweep to miss a few blocks and they can be sweeped later on. Not sure if processLeft and processRight needs a mutex since the multiGet is already locked.
We might have to revisit this later
src/query/functions/temporal/base.go
Outdated
values := make([]float64, 0, steps) | ||
|
||
seriesMeta := seriesIter.SeriesMeta() | ||
resultSeriesMeta := make([]block.SeriesMeta, len(seriesMeta)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't resultSeriesMeta be the same as incoming seriesMeta? How do you combine seriesMetas between multiple blocks if a particular series does not exist in a particular block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in resultSeriesMeta we just need to remove the seriesname (same as Prom), rest is the same. We assume all blocks have the same series. If it doesnt' then that series should be all NaNs for that time range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh does prom drop name here? Weird...
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type processor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get more tests for cases where we have multiple series per block, also where series are missing from some blocks, etc.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think series can be missing from some blocks. In that case, they should just be NaNs. I'll add tests for processSingleRequest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems inefficient to jam in a bunch of unnecessary NaNs for series that don't exist in blocks just to make processing a little easier; could add up to a lot of unneeded datapoints for sparse series
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that depends on the iterator but it's probably just gonna return NaNs and not actually store them.
@@ -56,6 +59,14 @@ func (o FetchOp) OpType() string { | |||
return FetchType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make FetchOp and FetchNode private similar to other Op/Nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah! Don't want to do it in this diff though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mind adding a todo jic?
src/query/functions/temporal/base.go
Outdated
for i, j := 0, len(blocks)-1; i < j; i, j = i+1, j-1 { | ||
blocks[i], blocks[j] = blocks[j], blocks[i] | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
src/query/functions/temporal/base.go
Outdated
return blks, nil | ||
} | ||
|
||
func reversed(blocks []block.Block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to reverse
src/query/functions/temporal/base.go
Outdated
} | ||
} | ||
|
||
reversed(blks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough; weird that there's no inbuilt reverse array method
src/query/functions/temporal/base.go
Outdated
} | ||
|
||
// Process a single index | ||
process := func(i int) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to addIfPresent, maybe add a comment as to what this does. Also, flip to return false if not added, and true if added, since true usually corresponds to the positive case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i made the name clearer and added a comment. I still think its reads better with empty, let me know once you read the new comment/naming
src/query/functions/temporal/base.go
Outdated
defer c.mu.Unlock() | ||
|
||
blks := make([]block.Block, 0, numBlocks) | ||
if numBlocks == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can do this before the mutex lock
src/query/functions/temporal/base.go
Outdated
} | ||
|
||
for seriesIter.Next() { | ||
values = values[0:0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can do values[:0] instead?
src/query/functions/temporal/base.go
Outdated
values := make([]float64, 0, steps) | ||
|
||
seriesMeta := seriesIter.SeriesMeta() | ||
resultSeriesMeta := make([]block.SeriesMeta, len(seriesMeta)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh does prom drop name here? Weird...
src/query/functions/temporal/base.go
Outdated
} | ||
|
||
bounds := seriesIter.Meta().Bounds | ||
steps := int((aggDuration + bounds.Duration) / bounds.StepSize) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add stepSize==0 sanity check; also can define this closer to where it's needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be handled by sanity check on bounds in Process()
src/query/functions/temporal/base.go
Outdated
if err != nil { | ||
return err | ||
} | ||
depIters[i] = iter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit newline
src/query/functions/temporal/base.go
Outdated
} | ||
|
||
func (c *baseNode) processSingleRequest(request processRequest) error { | ||
aggDuration := c.op.duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: define closer to where it's needed
Fix rebase issues Reduce scopes, add comments Implement count_over_time Fix query ranges to align with prom, add debugging Address comments overtime
) | ||
|
||
// FetchType gets the series from storage | ||
const FetchType = "fetch" | ||
|
||
// FetchOp stores required properties for fetch | ||
// TODO: Make FetchOp private |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved pending comments on TestSingleProcessRequest
No description provided.