Skip to content

Commit

Permalink
Close #682. Add backfill option to into clause parser
Browse files Browse the repository at this point in the history
Signed-off-by: Dhammika Pathirana <dhammika@gmail.com>
  • Loading branch information
dhammika authored and jvshahid committed Jul 1, 2014
1 parent 39acc29 commit 76c5024
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 5 deletions.
8 changes: 6 additions & 2 deletions src/coordinator/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,13 @@ func (s *RaftServer) CreateContinuousQuery(db string, query string) error {
}

// if there are already-running queries, we need to initiate a backfill
if duration != nil && !s.clusterConfig.LastContinuousQueryRunTime().IsZero() {
if selectQuery.GetIntoClause().Backfill &&
!s.clusterConfig.LastContinuousQueryRunTime().IsZero() {
zeroTime := time.Time{}
currentBoundary := time.Now().Truncate(*duration)
currentBoundary := time.Now()
if duration != nil {
currentBoundary = currentBoundary.Truncate(*duration)
}
go s.runContinuousQuery(db, selectQuery, zeroTime, currentBoundary)
} else {
// TODO: make continuous queries backfill for queries that don't have a group by time
Expand Down
73 changes: 73 additions & 0 deletions src/integration/multiple_servers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,79 @@ func (self *ServerSuite) TestContinuousQueryWithMixedGroupByOperations(c *C) {
c.Assert(means["/register"], Equals, 4.5)
}

func (self *ServerSuite) TestContinuousQueryBackfillOperations(c *C) {
defer self.serverProcesses[0].RemoveAllContinuousQueries("test_cq", c)

data := fmt.Sprintf(`[
{
"name": "cqbackfilltest",
"columns": ["time", "reqtime", "url"],
"points": [
[0, 8.0, "/login"],
[0, 3.0, "/list"],
[0, 4.0, "/register"],
[5, 9.0, "/login"],
[5, 4.0, "/list"],
[5, 5.0, "/register"]
]
}
]`)

self.serverProcesses[0].Post("/db/test_cq/series?u=paul&p=pass&time_precision=s", data, c)
// wait for the data to get written
self.serverProcesses[0].WaitForServerToSync()
// wait for the query to run
time.Sleep(time.Second)

self.serverProcesses[0].QueryAsRoot("test_cq", "select count(reqtime), url from cqbackfilltest group by time(1s), url into cqbackfill.1s", false, c)
defer self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c)
// wait for the continuous query to run
time.Sleep(time.Second)
// wait for the continuous queries to propagate
self.serverProcesses[0].WaitForServerToSync()

// query with backfill on
self.serverProcesses[0].QueryAsRoot("test_cq", "select count(reqtime), url from cqbackfilltest group by time(10s), url into cqbackfill_on.10s backfill(true)", false, c)
defer self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 2;", false, c)
// wait for the continuous query to run
time.Sleep(time.Second)
// wait for the continuous queries to propagate
self.serverProcesses[0].WaitForServerToSync()

// query with backfill off
self.serverProcesses[0].QueryAsRoot("test_cq", "select count(reqtime), url from cqbackfilltest group by time(10s), url into cqbackfill_off.10s backfill(false)", false, c)
defer self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 3;", false, c)
// wait for the continuous query to run
time.Sleep(time.Second)
// wait for the continuous queries to propagate
self.serverProcesses[0].WaitForServerToSync()

// check continuous queries
collection := self.serverProcesses[0].QueryAsRoot("test_cq", "list continuous queries;", false, c)
series := collection.GetSeries("continuous queries", c)
c.Assert(series.Points, HasLen, 3)

// check backfill_on query results
collection = self.serverProcesses[0].QueryAsRoot("test_cq", "select * from cqbackfill_on.10s", false, c)
c.Assert(len(collection.Members), Equals, 1)
series = collection.GetSeries("cqbackfill_on.10s", c)

counts := map[string]float64{}
for i := 0; i < 3; i++ {
count := series.GetValueForPointAndColumn(i, "count", c)
url := series.GetValueForPointAndColumn(i, "url", c)
counts[url.(string)] = count.(float64)
}

c.Assert(counts["/login"], Equals, float64(2))
c.Assert(counts["/list"], Equals, float64(2))
c.Assert(counts["/register"], Equals, float64(2))

// check backfill_off query results
collection = self.serverProcesses[0].QueryAsRoot("test_cq", "select * from cqbackfill_off.10s", false, c)
c.Assert(len(collection.Members), Equals, 0)
}

func (self *ServerSuite) TestChangingRaftPort(c *C) {
for _, server := range self.serverProcesses {
server.Stop()
Expand Down
3 changes: 3 additions & 0 deletions src/parser/frees.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ free_select_query (select_query *q)

if (q->into_clause) {
free_value(q->into_clause->target);
if (q->into_clause->backfill_function) {
free(q->into_clause->backfill_function);
}
free(q->into_clause);
}

Expand Down
41 changes: 38 additions & 3 deletions src/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ type From struct {
type Operation int

type IntoClause struct {
Target *Value
Target *Value
Backfill bool
BackfillValue *Value
}

type BasicQuery struct {
Expand Down Expand Up @@ -84,7 +86,13 @@ type Query struct {
}

func (self *IntoClause) GetString() string {
return self.Target.GetString()
buffer := bytes.NewBufferString("")

buffer.WriteString(self.Target.GetString())
if self.BackfillValue != nil {
fmt.Fprintf(buffer, " backfill(%s)", self.BackfillValue.GetString())
}
return buffer.String()
}

func (self *Query) GetQueryString() string {
Expand Down Expand Up @@ -439,12 +447,39 @@ func GetIntoClause(intoClause *C.into_clause) (*IntoClause, error) {
return nil, nil
}

backfill := true
var backfillValue *Value = nil

target, err := GetValue(intoClause.target)
if err != nil {
return nil, err
}

return &IntoClause{target}, nil
if intoClause.backfill_function != nil {
fun, err := GetValue(intoClause.backfill_function)
if err != nil {
return nil, err
}
if fun.Name != "backfill" {
return nil, fmt.Errorf("You can't use %s with into", fun.Name)
}

if len(fun.Elems) != 1 {
return nil, fmt.Errorf("`backfill` accepts only one argument")
}

backfillValue = fun.Elems[0]
backfill, err = strconv.ParseBool(backfillValue.GetString())
if err != nil {
return nil, fmt.Errorf("`backfill` accepts only bool arguments")
}
}

return &IntoClause{
Target: target,
Backfill: backfill,
BackfillValue: backfillValue,
}, nil
}

func GetWhereCondition(condition *C.condition) (*WhereCondition, error) {
Expand Down
3 changes: 3 additions & 0 deletions src/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ func (self *QueryParserSuite) TestGetQueryString(c *C) {
"select count(value) from t group by time(1h) into value.hourly",
"select count(value), host from t group by time(1h), host into value.hourly.[:host]",
"select count(value), host from t group by time(1h), host where time > now() - 1h into value.hourly.[:host]",
"select count(value), host from t group by time(1h), host into value.hourly.[:host] backfill(F)",
"select count(value), host from t group by time(1h), host where time > now() - 1h into value.hourly.[:host] backfill(true)",
"select count(value) from t, host into value.hourly.[:host] backfill(1)",
"delete from foo",
} {
fmt.Printf("testing %s\n", query)
Expand Down
8 changes: 8 additions & 0 deletions src/parser/query.yacc
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@ INTO_CLAUSE:
{
$$ = malloc(sizeof(into_clause));
$$->target = $2;
$$->backfill_function = NULL;
}
|
INTO INTO_VALUE FUNCTION_CALL
{
$$ = malloc(sizeof(into_clause));
$$->target = $2;
$$->backfill_function = $3;
}
|
{
Expand Down
1 change: 1 addition & 0 deletions src/parser/query_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ typedef struct {

typedef struct {
value *target;
value *backfill_function;
} into_clause;

typedef struct {
Expand Down

0 comments on commit 76c5024

Please sign in to comment.