From 4cc991be60468367ed94611fd49e0504e6cdc7e4 Mon Sep 17 00:00:00 2001 From: Andrew Mains Date: Tue, 22 Jan 2019 11:11:37 -0500 Subject: [PATCH] Fix tag propagation for temporal functions (#1307) Temporal functions don't properly propagate series tags (thanks to @arnikola for suggesting this as the bug!). For instance, given series: ``` coordinator_engine_datapoints{type="fetched"} 1386 coordinator_engine_datapoints{type="generated"} 104 ``` a query like increase(coordinator_engine_datapoints[5s]) will return ``` { "__name__": "coordinator_engine_datapoints", "instance": "host.docker.internal:7203", "job": "coordinator", "role": "remote" } { "__name__": "coordinator_engine_datapoints", "instance": "host.docker.internal:7203", "job": "coordinator", "role": "remote" } ``` dropping tags. Querying the same range without increase gives all tags, as expected. Fix is simple; we weren't copying tags into the new block's SeriesMetas; now we do. --- src/query/functions/temporal/base.go | 8 +- src/query/functions/temporal/base_test.go | 388 +++++++++------------- 2 files changed, 157 insertions(+), 239 deletions(-) diff --git a/src/query/functions/temporal/base.go b/src/query/functions/temporal/base.go index 01d263211a..e8db747242 100644 --- a/src/query/functions/temporal/base.go +++ b/src/query/functions/temporal/base.go @@ -254,10 +254,16 @@ func (c *baseNode) processSingleRequest(request processRequest) error { bounds := seriesIter.Meta().Bounds seriesMeta := seriesIter.SeriesMeta() + + // rename series to exclude their __name__ tag + // TODO: why do we do this? resultSeriesMeta := make([]block.SeriesMeta, len(seriesMeta)) for i, m := range seriesMeta { tags := m.Tags.WithoutName() - resultSeriesMeta[i].Name = tags.ID() + resultSeriesMeta[i] = block.SeriesMeta{ + Name: tags.ID(), + Tags: tags, + } } builder, err := c.controller.BlockBuilder(seriesIter.Meta(), resultSeriesMeta) diff --git a/src/query/functions/temporal/base_test.go b/src/query/functions/temporal/base_test.go index b6b5c11c2e..9600340c94 100644 --- a/src/query/functions/temporal/base_test.go +++ b/src/query/functions/temporal/base_test.go @@ -53,10 +53,10 @@ func (p *processor) Process(dps ts.Datapoints, _ time.Time) float64 { return sum } -func compareCacheState(t *testing.T, bNode *baseNode, bounds models.Bounds, state []bool, debugMsg string) { +func compareCacheState(t *testing.T, node *baseNode, bounds models.Bounds, state []bool, debugMsg string) { actualState := make([]bool, len(state)) for i := range state { - _, exists := bNode.cache.get(bounds.Next(i).Start) + _, exists := node.cache.get(bounds.Next(i).Start) actualState[i] = exists } @@ -121,351 +121,255 @@ func TestBaseWithB0(t *testing.T) { } func TestBaseWithB1B0(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(nil, nil) - blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, 2) - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := baseOp{ - operatorType: "dummy", - duration: 5 * time.Minute, - processorFn: processor{}, - } + tc := setup(2, 5*time.Minute, 1) - node := baseOp.Node(c, transform.Options{ - TimeSpec: transform.TimeSpec{ - Start: bounds.Start, - End: bounds.Next(1).End(), - Step: time.Second, - }, - }) - bNode := node.(*baseNode) - err := node.Process(parser.NodeID(0), blocks[1]) + err := tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) - assert.Len(t, sink.Values, 0, "nothing processed yet") - compareCacheState(t, bNode, bounds, []bool{false, true}, "B1 cached") + assert.Len(t, tc.Sink.Values, 0, "nothing processed yet") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true}, "B1 cached") - err = node.Process(parser.NodeID(0), blocks[0]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) - assert.Len(t, sink.Values, 4, "output from both blocks") - compareCacheState(t, bNode, bounds, []bool{false, false}, "everything removed from cache") - blks, err := bNode.cache.multiGet(bounds, 2, false) + assert.Len(t, tc.Sink.Values, 4, "output from both blocks") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false}, "everything removed from cache") + blks, err := tc.Node.cache.multiGet(tc.Bounds, 2, false) require.NoError(t, err) assert.Len(t, blks, 0) } func TestBaseWithB0B1(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(nil, nil) - blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, 2) - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := baseOp{ - operatorType: "dummy", - duration: 5 * time.Minute, - processorFn: processor{}, - } + tc := setup(2, 5*time.Minute, 1) + + err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) - node := baseOp.Node(c, transform.Options{ - TimeSpec: transform.TimeSpec{ - Start: bounds.Start, - End: bounds.Next(1).End(), - Step: time.Second, - }, - }) - bNode := node.(*baseNode) - err := node.Process(parser.NodeID(0), blocks[0]) require.NoError(t, err) - assert.Len(t, sink.Values, 2, "B0 processed") - compareCacheState(t, bNode, bounds, []bool{true, false}, "B0 cached for future") + assert.Len(t, tc.Sink.Values, 2, "B0 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false}, "B0 cached for future") - err = node.Process(parser.NodeID(0), blocks[1]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) - assert.Len(t, sink.Values, 4, "output from both blocks") - compareCacheState(t, bNode, bounds, []bool{false, false}, "B0 removed from cache, B1 not cached") - blks, err := bNode.cache.multiGet(bounds, 2, false) + assert.Len(t, tc.Sink.Values, 4, "output from both blocks") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false}, "B0 removed from cache, B1 not cached") + blks, err := tc.Node.cache.multiGet(tc.Bounds, 2, false) require.NoError(t, err) assert.Len(t, blks, 0) } func TestBaseWithB0B1B2(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(nil, nil) - blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, 3) - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := baseOp{ - operatorType: "dummy", - duration: 5 * time.Minute, - processorFn: processor{}, - } - - node := baseOp.Node(c, transform.Options{ - TimeSpec: transform.TimeSpec{ - Start: bounds.Start, - End: bounds.Next(2).End(), - Step: time.Second, - }, - }) - bNode := node.(*baseNode) + tc := setup(3, 5*time.Minute, 2) // B0 arrives - err := node.Process(parser.NodeID(0), blocks[0]) + err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) - assert.Len(t, sink.Values, 2, "B0 processed") - compareCacheState(t, bNode, bounds, []bool{true, false, false}, "B0 cached for future") + assert.Len(t, tc.Sink.Values, 2, "B0 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, false}, "B0 cached for future") // B1 arrives - err = node.Process(parser.NodeID(0), blocks[1]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) - assert.Len(t, sink.Values, 4, "output from B0, B1") - compareCacheState(t, bNode, bounds, []bool{false, true, false}, "B0 removed from cache, B1 cached") + assert.Len(t, tc.Sink.Values, 4, "output from B0, B1") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B0 removed from cache, B1 cached") // B2 arrives - err = node.Process(parser.NodeID(0), blocks[2]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) - assert.Len(t, sink.Values, 6, "output from all blocks") - compareCacheState(t, bNode, bounds, []bool{false, false, false}, "nothing cached") + assert.Len(t, tc.Sink.Values, 6, "output from all blocks") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") } func TestBaseWithB0B2B1(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(nil, nil) - blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, 3) - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := baseOp{ - operatorType: "dummy", - duration: 5 * time.Minute, - processorFn: processor{}, - } - - node := baseOp.Node(c, transform.Options{ - TimeSpec: transform.TimeSpec{ - Start: bounds.Start, - End: bounds.Next(2).End(), - Step: time.Second, - }, - }) - bNode := node.(*baseNode) + tc := setup(3, 5*time.Minute, 2) // B0 arrives - err := node.Process(parser.NodeID(0), blocks[0]) + err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) - assert.Len(t, sink.Values, 2, "B0 processed") - compareCacheState(t, bNode, bounds, []bool{true, false, false}, "B0 cached for future") + assert.Len(t, tc.Sink.Values, 2, "B0 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, false}, "B0 cached for future") // B2 arrives - err = node.Process(parser.NodeID(0), blocks[2]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) - assert.Len(t, sink.Values, 2, "Only B0 processed") - compareCacheState(t, bNode, bounds, []bool{true, false, true}, "B0, B2 cached") + assert.Len(t, tc.Sink.Values, 2, "Only B0 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, true}, "B0, B2 cached") // B1 arrives - err = node.Process(parser.NodeID(0), blocks[1]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) - assert.Len(t, sink.Values, 6, "output from all blocks") - compareCacheState(t, bNode, bounds, []bool{false, false, false}, "nothing cached") + assert.Len(t, tc.Sink.Values, 6, "output from all blocks") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") } func TestBaseWithB1B0B2(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(nil, nil) - blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, 3) - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := baseOp{ - operatorType: "dummy", - duration: 5 * time.Minute, - processorFn: processor{}, - } - - node := baseOp.Node(c, transform.Options{ - TimeSpec: transform.TimeSpec{ - Start: bounds.Start, - End: bounds.Next(2).End(), - Step: time.Second, - }, - }) - bNode := node.(*baseNode) + tc := setup(3, 5*time.Minute, 2) // B1 arrives - err := node.Process(parser.NodeID(0), blocks[1]) + err := tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) - assert.Len(t, sink.Values, 0, "Nothing processed") - compareCacheState(t, bNode, bounds, []bool{false, true, false}, "B1 cached for future") + assert.Len(t, tc.Sink.Values, 0, "Nothing processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 cached for future") // B0 arrives - err = node.Process(parser.NodeID(0), blocks[0]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) - assert.Len(t, sink.Values, 4, "B0, B1 processed") - compareCacheState(t, bNode, bounds, []bool{false, true, false}, "B1 still cached, B0 not cached") + assert.Len(t, tc.Sink.Values, 4, "B0, B1 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 still cached, B0 not cached") // B2 arrives - err = node.Process(parser.NodeID(0), blocks[2]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) - assert.Len(t, sink.Values, 6, "output from all blocks") - compareCacheState(t, bNode, bounds, []bool{false, false, false}, "nothing cached") + assert.Len(t, tc.Sink.Values, 6, "output from all blocks") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") } func TestBaseWithB1B2B0(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(nil, nil) - blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, 3) - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := baseOp{ - operatorType: "dummy", - duration: 5 * time.Minute, - processorFn: processor{}, - } - - node := baseOp.Node(c, transform.Options{ - TimeSpec: transform.TimeSpec{ - Start: bounds.Start, - End: bounds.Next(2).End(), - Step: time.Second, - }, - }) - bNode := node.(*baseNode) + tc := setup(3, 5*time.Minute, 2) // B1 arrives - err := node.Process(parser.NodeID(0), blocks[1]) + err := tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) - assert.Len(t, sink.Values, 0, "Nothing processed") - compareCacheState(t, bNode, bounds, []bool{false, true, false}, "B1 cached for future") + assert.Len(t, tc.Sink.Values, 0, "Nothing processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 cached for future") // B2 arrives - err = node.Process(parser.NodeID(0), blocks[2]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) - assert.Len(t, sink.Values, 2, "B1 processed") - compareCacheState(t, bNode, bounds, []bool{false, true, false}, "B1 still cached, B2 not cached") + assert.Len(t, tc.Sink.Values, 2, "B1 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 still cached, B2 not cached") // B0 arrives - err = node.Process(parser.NodeID(0), blocks[0]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) - assert.Len(t, sink.Values, 6, "output from all blocks") - compareCacheState(t, bNode, bounds, []bool{false, false, false}, "nothing cached") + assert.Len(t, tc.Sink.Values, 6, "output from all blocks") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") } func TestBaseWithB2B0B1(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(nil, nil) - blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, 3) - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := baseOp{ - operatorType: "dummy", - duration: 5 * time.Minute, - processorFn: processor{}, - } - - node := baseOp.Node(c, transform.Options{ - TimeSpec: transform.TimeSpec{ - Start: bounds.Start, - End: bounds.Next(2).End(), - Step: time.Second, - }, - }) - bNode := node.(*baseNode) + tc := setup(3, 5*time.Minute, 2) // B2 arrives - err := node.Process(parser.NodeID(0), blocks[2]) + err := tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) - assert.Len(t, sink.Values, 0, "Nothing processed") - compareCacheState(t, bNode, bounds, []bool{false, false, true}, "B2 cached for future") + assert.Len(t, tc.Sink.Values, 0, "Nothing processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, true}, "B2 cached for future") // B0 arrives - err = node.Process(parser.NodeID(0), blocks[0]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) - assert.Len(t, sink.Values, 2, "B0 processed") - compareCacheState(t, bNode, bounds, []bool{true, false, true}, "B0, B2 cached") + assert.Len(t, tc.Sink.Values, 2, "B0 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, true}, "B0, B2 cached") // B1 arrives - err = node.Process(parser.NodeID(0), blocks[1]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) - assert.Len(t, sink.Values, 6, "output from all blocks") - compareCacheState(t, bNode, bounds, []bool{false, false, false}, "nothing cached") + assert.Len(t, tc.Sink.Values, 6, "output from all blocks") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") } func TestBaseWithB2B1B0(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(nil, nil) - blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, 3) - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := baseOp{ - operatorType: "dummy", - duration: 5 * time.Minute, - processorFn: processor{}, - } - - node := baseOp.Node(c, transform.Options{ - TimeSpec: transform.TimeSpec{ - Start: bounds.Start, - End: bounds.Next(2).End(), - Step: time.Second, - }, - }) - bNode := node.(*baseNode) + tc := setup(3, 5*time.Minute, 2) // B2 arrives - err := node.Process(parser.NodeID(0), blocks[2]) + err := tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) - assert.Len(t, sink.Values, 0, "Nothing processed") - compareCacheState(t, bNode, bounds, []bool{false, false, true}, "B2 cached for future") + assert.Len(t, tc.Sink.Values, 0, "Nothing processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, true}, "B2 cached for future") // B1 arrives - err = node.Process(parser.NodeID(0), blocks[1]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) - assert.Len(t, sink.Values, 2, "B0 processed") - compareCacheState(t, bNode, bounds, []bool{false, true, false}, "B1 cached, B2 removed") + assert.Len(t, tc.Sink.Values, 2, "B0 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, false}, "B1 cached, B2 removed") // B0 arrives - err = node.Process(parser.NodeID(0), blocks[0]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) - assert.Len(t, sink.Values, 6, "output from all blocks") - compareCacheState(t, bNode, bounds, []bool{false, false, false}, "nothing cached") + assert.Len(t, tc.Sink.Values, 6, "output from all blocks") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false}, "nothing cached") } func TestBaseWithSize3B0B1B2B3B4(t *testing.T) { - values, bounds := test.GenerateValuesAndBounds(nil, nil) - blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, 5) - c, sink := executor.NewControllerWithSink(parser.NodeID(1)) - baseOp := baseOp{ - operatorType: "dummy", - duration: 15 * time.Minute, - processorFn: processor{}, - } - - node := baseOp.Node(c, transform.Options{ - TimeSpec: transform.TimeSpec{ - Start: bounds.Start, - End: bounds.Next(4).End(), - Step: time.Second, - }, - }) - bNode := node.(*baseNode) + tc := setup(5, 15*time.Minute, 4) // B0 arrives - err := node.Process(parser.NodeID(0), blocks[0]) + err := tc.Node.Process(parser.NodeID(0), tc.Blocks[0]) require.NoError(t, err) - assert.Len(t, sink.Values, 2, "B0 processed") - compareCacheState(t, bNode, bounds, []bool{true, false, false, false, false}, "B0 cached for future") + assert.Len(t, tc.Sink.Values, 2, "B0 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{true, false, false, false, false}, "B0 cached for future") // B1 arrives - err = node.Process(parser.NodeID(0), blocks[1]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[1]) require.NoError(t, err) - assert.Len(t, sink.Values, 4, "B0, B1 processed") - compareCacheState(t, bNode, bounds, []bool{true, true, false, false, false}, "B0, B1 cached") + assert.Len(t, tc.Sink.Values, 4, "B0, B1 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{true, true, false, false, false}, "B0, B1 cached") // B2 arrives - err = node.Process(parser.NodeID(0), blocks[2]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[2]) require.NoError(t, err) - assert.Len(t, sink.Values, 6, "B0, B1, B2 processed") - compareCacheState(t, bNode, bounds, []bool{true, true, true, false, false}, "B0, B1, B2 cached") + assert.Len(t, tc.Sink.Values, 6, "B0, B1, B2 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{true, true, true, false, false}, "B0, B1, B2 cached") // B3 arrives - err = node.Process(parser.NodeID(0), blocks[3]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[3]) require.NoError(t, err) - assert.Len(t, sink.Values, 8, "B0, B1, B2, B3 processed") - compareCacheState(t, bNode, bounds, []bool{false, true, true, true, false}, "B0 removed, B1, B2, B3 cached") + assert.Len(t, tc.Sink.Values, 8, "B0, B1, B2, B3 processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, true, true, true, false}, "B0 removed, B1, B2, B3 cached") // B4 arrives - err = node.Process(parser.NodeID(0), blocks[4]) + err = tc.Node.Process(parser.NodeID(0), tc.Blocks[4]) require.NoError(t, err) - assert.Len(t, sink.Values, 10, "all 5 blocks processed") - compareCacheState(t, bNode, bounds, []bool{false, false, false, false, false}, "nothing cached") + assert.Len(t, tc.Sink.Values, 10, "all 5 blocks processed") + compareCacheState(t, tc.Node, tc.Bounds, []bool{false, false, false, false, false}, "nothing cached") +} + +type testContext struct { + Bounds models.Bounds + Blocks []block.Block + Sink *executor.SinkNode + Node *baseNode +} + +func setup(numBlocks int, duration time.Duration, nextBound int) *testContext { + values, bounds := test.GenerateValuesAndBounds(nil, nil) + blocks := test.NewMultiUnconsolidatedBlocksFromValues(bounds, values, test.NoopMod, numBlocks) + c, sink := executor.NewControllerWithSink(parser.NodeID(1)) + baseOp := baseOp{ + operatorType: "dummy", + duration: duration, + processorFn: processor{}, + } + node := baseOp.Node(c, transform.Options{ + TimeSpec: transform.TimeSpec{ + Start: bounds.Start, + End: bounds.Next(nextBound).End(), + Step: time.Second, + }, + }) + return &testContext{ + Bounds: bounds, + Blocks: blocks, + Sink: sink, + Node: node.(*baseNode), + } } func TestSingleProcessRequest(t *testing.T) { values, bounds := test.GenerateValuesAndBounds(nil, nil) boundStart := bounds.Start - b := test.NewUnconsolidatedBlockFromDatapoints(bounds, values) + + seriesMetas := []block.SeriesMeta{{ + Name: "s1", + Tags: models.EmptyTags().AddTags([]models.Tag{{ + Name: []byte("t1"), + Value: []byte("v1"), + }})}, { + Name: "s2", + Tags: models.EmptyTags().AddTags([]models.Tag{{ + Name: []byte("t1"), + Value: []byte("v2"), + }}), + }} + + b := test.NewUnconsolidatedBlockFromDatapointsWithMeta(bounds, seriesMetas, values) block2, _ := b.Unconsolidated() values = [][]float64{{10, 11, 12, 13, 14}, {15, 16, 17, 18, 19}} @@ -475,7 +379,7 @@ func TestSingleProcessRequest(t *testing.T) { StepSize: bounds.StepSize, } - b = test.NewUnconsolidatedBlockFromDatapoints(block1Bounds, values) + b = test.NewUnconsolidatedBlockFromDatapointsWithMeta(block1Bounds, seriesMetas, values) block1, _ := b.Unconsolidated() c, sink := executor.NewControllerWithSink(parser.NodeID(1)) @@ -502,4 +406,12 @@ func TestSingleProcessRequest(t *testing.T) { // i = 1; prev values [12, 13, 14, 15], current values [0, 1], sum = 40 assert.Equal(t, sink.Values[0], []float64{50, 40, 30, 20, 10}, "first series is 10 - 14 which sums to 60, the current block first series is 0-4 which sums to 10, we need 5 values per aggregation") assert.Equal(t, sink.Values[1], []float64{75, 65, 55, 45, 35}, "second series is 15 - 19 which sums to 85 and second series is 5-9 which sums to 35") + + // processSingleRequest renames the series to use their ids; reflect this in our expectation. + expectedSeriesMetas := make([]block.SeriesMeta, len(seriesMetas)) + require.Equal(t, len(expectedSeriesMetas), copy(expectedSeriesMetas, seriesMetas)) + expectedSeriesMetas[0].Name = "t1=v1," + expectedSeriesMetas[1].Name = "t1=v2," + + assert.Equal(t, expectedSeriesMetas, sink.Metas, "Process should pass along series meta, renaming to the ID") }