Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Points with different number of columns from multiple shards return incorrect result #392

Merged
merged 2 commits into from
Apr 9, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [Issue #413](https://github.com/influxdb/influxdb/issues/413). Don't assume that group by interval is greater than a second
- [Issue #415](https://github.com/influxdb/influxdb/issues/415). Include the database when sending an auth error back to the user
- [Issue #421](https://github.com/influxdb/influxdb/issues/421). Make read timeout a config option
- [Issue #392](https://github.com/influxdb/influxdb/issues/392). Different columns in different shards returns invalid results when a query spans those shards

### Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion src/api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (self *AllPointsWriter) yield(series *protocol.Series) error {
return nil
}

oldSeries.Points = append(oldSeries.Points, series.Points...)
self.memSeries[series.GetName()] = MergeSeries(self.memSeries[series.GetName()], series)
return nil
}

Expand Down
79 changes: 79 additions & 0 deletions src/common/merge_series.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package common

import (
"protocol"
"reflect"
)

func pointMaps(s *protocol.Series) (result []map[string]*protocol.FieldValue) {
for _, p := range s.Points {
pointMap := map[string]*protocol.FieldValue{}
for idx, value := range p.Values {
pointMap[s.Fields[idx]] = value
}
result = append(result, pointMap)
}
return
}

// merges two time series making sure that the resulting series has
// the union of the two series columns and the values set
// properly. will panic if the two series don't have the same name
func MergeSeries(s1, s2 *protocol.Series) *protocol.Series {
if s1.GetName() != s1.GetName() {
panic("the two series don't have the same name")
}

// if the two series have the same columns and in the same order
// append the points and return.
if reflect.DeepEqual(s1.Fields, s2.Fields) {
s1.Points = append(s1.Points, s2.Points...)
return s1
}

columns := map[string]struct{}{}

for _, cs := range [][]string{s1.Fields, s2.Fields} {
for _, c := range cs {
columns[c] = struct{}{}
}
}

points := append(pointMaps(s1), pointMaps(s2)...)

fieldsSlice := make([]string, 0, len(columns))
for c, _ := range columns {
fieldsSlice = append(fieldsSlice, c)
}

resultPoints := make([]*protocol.Point, 0, len(points))
for idx, point := range points {
resultPoint := &protocol.Point{}
for _, field := range fieldsSlice {
value := point[field]
if value == nil {
value = &protocol.FieldValue{
IsNull: &TRUE,
}
}
resultPoint.Values = append(resultPoint.Values, value)
if idx < len(s1.Points) {
resultPoint.Timestamp = s1.Points[idx].Timestamp
resultPoint.SequenceNumber = s1.Points[idx].SequenceNumber
} else {
resultPoint.Timestamp = s2.Points[idx-len(s1.Points)].Timestamp
resultPoint.SequenceNumber = s2.Points[idx-len(s1.Points)].SequenceNumber
}
}
resultPoints = append(resultPoints, resultPoint)
}

// otherwise, merge the columns
result := &protocol.Series{
Name: s1.Name,
Fields: fieldsSlice,
Points: resultPoints,
}

return result
}
5 changes: 4 additions & 1 deletion src/common/serialize_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"regexp"
)

var TRUE = true
var (
TRUE = true
FALSE = false
)

type TimePrecision int

Expand Down
34 changes: 5 additions & 29 deletions src/engine/passthrough_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package engine
// This engine buffers points and passes them through without modification. Works for queries
// that can't be aggregated locally or queries that don't require it like deletes and drops.
import (
log "code.google.com/p/log4go"
"common"
"protocol"

log "code.google.com/p/log4go"
)

type PassthroughEngine struct {
Expand Down Expand Up @@ -45,34 +47,8 @@ func NewPassthroughEngineWithLimit(responseChan chan *protocol.Response, maxPoin
}

func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool {
self.responseType = &queryResponse
series := &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}
self.limiter.calculateLimitAndSlicePoints(series)
if len(series.Points) == 0 {
return false
}

if self.response == nil {
self.response = &protocol.Response{
Type: self.responseType,
Series: series,
}
} else if *self.response.Series.Name != *seriesName {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: self.responseType,
Series: series,
}
} else if len(self.response.Series.Points) > self.maxPointsInResponse {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: self.responseType,
Series: series,
}
} else {
self.response.Series.Points = append(self.response.Series.Points, point)
}
return !self.limiter.hitLimit(*seriesName)
return self.YieldSeries(series)
}

func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool {
Expand Down Expand Up @@ -108,7 +84,7 @@ func (self *PassthroughEngine) YieldSeries(seriesIncoming *protocol.Series) bool
Series: seriesIncoming,
}
} else {
self.response.Series.Points = append(self.response.Series.Points, seriesIncoming.Points...)
self.response.Series = common.MergeSeries(self.response.Series, seriesIncoming)
}
return !self.limiter.hitLimit(seriesIncoming.GetName())
//return true
Expand Down
46 changes: 46 additions & 0 deletions src/integration/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,52 @@ func (self *IntegrationSuite) TestSmallGroupByIntervals(c *C) {
c.Assert(serieses[0].Points[0][1], Equals, 1.0)
}

// issue #392
func (self *IntegrationSuite) TestDifferentColumnsAcrossShards(c *C) {
i := 0.0
serieses := self.createPointsFromFunc("test_different_columns_across_shards", 1, 1, func(_ int) float64 { i++; return i })
now := time.Now().Truncate(24 * time.Hour)
serieses[0].Columns = []string{"column0", "time"}
serieses[0].Points[0] = append(serieses[0].Points[0], now.Unix())
self.server.WriteData(serieses, c, "s")
serieses = self.createPointsFromFunc("test_different_columns_across_shards", 2, 1, func(_ int) float64 { i++; return i })
serieses[0].Columns = []string{"column0", "column1", "time"}
serieses[0].Points[0] = append(serieses[0].Points[0], now.Add(-8*24*time.Hour).Unix())
self.server.WriteData(serieses, c, "s")
time.Sleep(time.Second)
serieses = self.server.RunQuery("select * from test_different_columns_across_shards", "s", c)
c.Assert(serieses, HasLen, 1)

maps := toMap(serieses[0])
c.Assert(maps[0]["column0"], Equals, 1.0)
c.Assert(maps[0]["column1"], IsNil)
c.Assert(maps[1]["column0"], Equals, 2.0)
c.Assert(maps[1]["column1"], Equals, 3.0)
}

// issue #392
func (self *IntegrationSuite) TestDifferentColumnsAcrossShards2(c *C) {
i := 0.0
serieses := self.createPointsFromFunc("test_different_columns_across_shards_2", 1, 1, func(_ int) float64 { i++; return i })
now := time.Now().Truncate(24 * time.Hour)
serieses[0].Columns = []string{"column1", "time"}
serieses[0].Points[0] = append(serieses[0].Points[0], now.Add(-13*24*time.Hour).Unix())
self.server.WriteData(serieses, c, "s")
serieses = self.createPointsFromFunc("test_different_columns_across_shards_2", 2, 1, func(_ int) float64 { i++; return i })
serieses[0].Columns = []string{"column1", "column0", "time"}
serieses[0].Points[0] = append(serieses[0].Points[0], now.Unix())
self.server.WriteData(serieses, c, "s")
time.Sleep(time.Second)
serieses = self.server.RunQuery("select * from test_different_columns_across_shards_2", "s", c)
c.Assert(serieses, HasLen, 1)

maps := toMap(serieses[0])
c.Assert(maps[0]["column0"], Equals, 3.0)
c.Assert(maps[0]["column1"], Equals, 2.0)
c.Assert(maps[1]["column0"], IsNil)
c.Assert(maps[1]["column1"], Equals, 1.0)
}

func (self *IntegrationSuite) TestExplainsWithPassthrough(c *C) {
data := `
[{
Expand Down