Skip to content

Commit

Permalink
fix #456. Continuous queries failed if column had null value
Browse files Browse the repository at this point in the history
If one or more columns had null value and they were used in the group by
clause, they caused the continuous query to fail on the write. That's
because protobuf doesn't like null values in repeated fields.
  • Loading branch information
jvshahid committed Apr 18, 2014
1 parent a9cb07d commit 5d02db6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
6 changes: 5 additions & 1 deletion src/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
log "code.google.com/p/log4go"
)

var (
TRUE = true
)

type QueryEngine struct {
query *parser.SelectQuery
fields []string
Expand Down Expand Up @@ -630,7 +634,7 @@ func (self *QueryEngine) yieldValuesForTableAndGroups(table string, groups []Gro
case int64:
point.Values = append(point.Values, &protocol.FieldValue{Int64Value: &x})
case nil:
point.Values = append(point.Values, nil)
point.Values = append(point.Values, &protocol.FieldValue{IsNull: &TRUE})
}
}

Expand Down
10 changes: 10 additions & 0 deletions src/integration/helpers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ func (self *Server) WaitForServerToSync() {
panic("servers didn't sync")
}

// optional db
func (self *Server) GetClient(db string, c *C) *influxdb.Client {
client, err := influxdb.NewClient(&influxdb.ClientConfig{
Host: fmt.Sprintf("localhost:%d", self.apiPort),
Database: db,
})
c.Assert(err, IsNil)
return client
}

func (self *Server) WriteData(data interface{}, c *C, precision ...influxdb.TimePrecision) {
client, err := influxdb.NewClient(&influxdb.ClientConfig{
Username: "user",
Expand Down
44 changes: 43 additions & 1 deletion src/integration/multiple_servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"net/url"
"os"
"time"

influxdb "github.com/influxdb/influxdb-go"
. "launchpad.net/gocheck"
)

Expand Down Expand Up @@ -62,7 +64,7 @@ func (self *ServerSuite) SetUpSuite(c *C) {
}
self.serverProcesses[0].SetSslOnly(true)
client := self.serverProcesses[0].GetClient("", c)
dbs := []string {"full_rep", "test_rep", "single_rep", "test_cq", "drop_db"}
dbs := []string{"full_rep", "test_rep", "single_rep", "test_cq", "drop_db"}
for _, db := range dbs {
c.Assert(client.CreateDatabase(db), IsNil)
}
Expand Down Expand Up @@ -641,6 +643,46 @@ func (self *ServerSuite) TestContinuousQueryGroupByOperations(c *C) {
self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 2;", false, c)
}

func (self *ServerSuite) TestContinuousQueryGroupByOperationsWithNullColumns(c *C) {
client := self.serverProcesses[0].GetClient("test_cq", c)
for i := 0; i < 2; i++ {
columns := []string{"a"}
values := []interface{}{1}
if i == 1 {
columns = append(columns, "b")
values = append(values, 2)
}
series := []*influxdb.Series{
&influxdb.Series{
Name: "test_null_columns",
Columns: columns,
Points: [][]interface{}{
values,
},
},
}
c.Assert(client.WriteSeries(series), IsNil)
}

self.serverProcesses[0].WaitForServerToSync()

_, err := client.Query("select count(a), b from test_null_columns group by time(1s), b into test_null_columns.1s")
c.Assert(err, IsNil)

// wait for the query to run
time.Sleep(2 * time.Second)

self.serverProcesses[0].WaitForServerToSync()

s, err := client.Query("select count(count) from test_null_columns.1s")
c.Assert(err, IsNil)
c.Assert(s, HasLen, 1)
maps := ToMap(s[0])
c.Assert(maps, HasLen, 1)
// make sure the continuous query inserted two points
c.Assert(maps[0]["count"], Equals, 2.0)
}

func (self *ServerSuite) TestContinuousQueryInterpolation(c *C) {
self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s1 into :series_name.foo;", false, c)
self.serverProcesses[0].QueryAsRoot("test_cq", "select * from s2 into :series_name.foo.[c3];", false, c)
Expand Down

0 comments on commit 5d02db6

Please sign in to comment.