Skip to content

Commit

Permalink
Merge pull request #1900 from influxdb/update-limit-and-offset
Browse files Browse the repository at this point in the history
Update LIMIT/OFFSET and SLIMIT/SOFFSET
  • Loading branch information
toddboom committed Mar 10, 2015
2 parents 0063dd1 + 1a545a1 commit 031ea1d
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 17 deletions.
59 changes: 59 additions & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,65 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
expected: `{"results":[{"series":[{"name":"where_events","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"]]}]}]}`,
},

// LIMIT and OFFSET tests

{
name: "limit on points",
write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [
{"name": "limit", "timestamp": "2009-11-10T23:00:02Z","fields": {"foo": "bar"}, "tags": {"tennant": "paul"}},
{"name": "limit", "timestamp": "2009-11-10T23:00:03Z","fields": {"foo": "baz"}, "tags": {"tennant": "paul"}},
{"name": "limit", "timestamp": "2009-11-10T23:00:04Z","fields": {"foo": "bat"}, "tags": {"tennant": "paul"}},
{"name": "limit", "timestamp": "2009-11-10T23:00:05Z","fields": {"foo": "bar"}, "tags": {"tennant": "todd"}}
]}`,
query: `select foo from "%DB%"."%RP%".limit LIMIT 2`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"]]}]}]}`,
},
{
name: "limit higher than the number of data points",
query: `select foo from "%DB%"."%RP%".limit LIMIT 20`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"],["2009-11-10T23:00:04Z","bat"],["2009-11-10T23:00:05Z","bar"]]}]}]}`,
},
{
name: "limit and offset",
query: `select foo from "%DB%"."%RP%".limit LIMIT 2 OFFSET 1`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:03Z","baz"],["2009-11-10T23:00:04Z","bat"]]}]}]}`,
},
{
name: "limit + offset higher than number of points",
query: `select foo from "%DB%"."%RP%".limit LIMIT 3 OFFSET 3`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:05Z","bar"]]}]}]}`,
},
{
name: "offset higher than number of points",
query: `select foo from "%DB%"."%RP%".limit LIMIT 2 OFFSET 20`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"]}]}]}`,
},
{
name: "limit on points with group by time",
query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 2`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"]]}]}]}`,
},
{
name: "limit higher than the number of data points with group by time",
query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 20`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:02Z","bar"],["2009-11-10T23:00:03Z","baz"],["2009-11-10T23:00:04Z","bat"],["2009-11-10T23:00:05Z","bar"]]}]}]}`,
},
{
name: "limit and offset with group by time",
query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 2 OFFSET 1`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:03Z","baz"],["2009-11-10T23:00:04Z","bat"]]}]}]}`,
},
{
name: "limit + offset higher than number of points with group by time",
query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 3 OFFSET 3`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"],"values":[["2009-11-10T23:00:05Z","bar"]]}]}]}`,
},
{
name: "offset higher than number of points with group by time",
query: `select foo from "%DB%"."%RP%".limit WHERE time >= '2009-11-10T23:00:02Z' AND time < '2009-11-10T23:00:06Z' GROUP BY time(1s) LIMIT 2 OFFSET 20`,
expected: `{"results":[{"series":[{"name":"limit","columns":["time","foo"]}]}]}`,
},

// Metadata display tests

{
Expand Down
10 changes: 7 additions & 3 deletions influxql/INFLUXQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ INNER INSERT INTO KEY KEYS LIMIT
SHOW MEASUREMENT MEASUREMENTS OFFSET ON ORDER
PASSWORD POLICY POLICIES PRIVILEGES QUERIES QUERY
READ REPLICATION RETENTION REVOKE SELECT SERIES
TAG TO USER USERS VALUES WHERE
WITH WRITE
SLIMIT SOFFSET TAG TO USER USERS
VALUES WHERE WITH WRITE
```

## Literals
Expand Down Expand Up @@ -569,7 +569,7 @@ REVOKE READ ON mydb FROM jdoe;
```
select_stmt = fields from_clause [ into_clause ] [ where_clause ]
[ group_by_clause ] [ order_by_clause ] [ limit_clause ]
[ offset_clause ] .
[ offset_clause ] [ slimit_clause ] [ soffset_clause ].
```

#### Examples:
Expand All @@ -590,6 +590,10 @@ limit_clause = "LIMIT" int_lit .
offset_clause = "OFFSET" int_lit .
slimit_clause = "SLIMIT" int_lit .
soffset_clause = "SOFFSET" int_lit .
on_clause = db_name .
order_by_clause = "ORDER BY" sort_fields .
Expand Down
11 changes: 9 additions & 2 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,18 @@ type SelectStatement struct {
// Fields to sort results by
SortFields SortFields

// Maximum number of rows to be returned.
// Unlimited if zero.
// Maximum number of rows to be returned. Unlimited if zero.
Limit int

// Returns rows starting at an offset from the first row.
Offset int

// Maxiumum number of series to be returned. Unlimited if zero.
SLimit int

// Returns series starting at an offset from the first one.
SOffset int

// memoize the group by interval
groupByInterval time.Duration

Expand All @@ -571,6 +576,8 @@ func (s *SelectStatement) Clone() *SelectStatement {
Condition: CloneExpr(s.Condition),
Limit: s.Limit,
Offset: s.Offset,
SLimit: s.SLimit,
SOffset: s.SOffset,
}
if s.Target != nil {
other.Target = &Target{Measurement: s.Target.Measurement, Database: s.Target.Database}
Expand Down
46 changes: 41 additions & 5 deletions influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,27 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
resultValues[i] = append(vals, time.Unix(0, t).UTC())
}

// now limit the number of data points returned by the limit and offset
if pointCountInResult > 1 && (m.stmt.Limit > 0 || m.stmt.Offset > 0) {
if m.stmt.Offset > len(resultValues) {
out <- &Row{
Name: m.MeasurementName,
Tags: m.TagSet.Tags,
}

return
} else {
limit := m.stmt.Limit
if m.stmt.Offset+m.stmt.Limit > len(resultValues) {
limit = len(resultValues) - m.stmt.Offset
}

resultTimes = resultTimes[m.stmt.Offset : m.stmt.Offset+limit]
resultValues = resultValues[m.stmt.Offset : m.stmt.Offset+limit]
}
m.TMin = resultTimes[0]
}

// now loop through the aggregate functions and populate everything
for i, c := range aggregates {
if err := m.processAggregate(c, reduceFuncs[i], resultValues); err != nil {
Expand Down Expand Up @@ -384,6 +405,21 @@ func (m *MapReduceJob) processRawResults(resultValues [][]interface{}) *Row {
row.Values = append(row.Values, vals)
}

// apply limit and offset, if applicable
// TODO: make this so it doesn't read the whole result set into memory
if m.stmt.Limit > 0 || m.stmt.Offset > 0 {
if m.stmt.Offset > len(row.Values) {
row.Values = nil
} else {
limit := m.stmt.Limit
if m.stmt.Offset+m.stmt.Limit > len(row.Values) {
limit = len(row.Values) - m.stmt.Offset
}

row.Values = row.Values[m.stmt.Offset : m.stmt.Offset+limit]
}
}

return row
}

Expand Down Expand Up @@ -494,15 +530,15 @@ func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
}

// LIMIT and OFFSET the unique series
if stmt.Limit > 0 || stmt.Offset > 0 {
if stmt.Offset > len(jobs) {
if stmt.SLimit > 0 || stmt.SOffset > 0 {
if stmt.SOffset > len(jobs) {
jobs = nil
} else {
if stmt.Offset+stmt.Limit > len(jobs) {
stmt.Limit = len(jobs) - stmt.Offset
if stmt.SOffset+stmt.SLimit > len(jobs) {
stmt.SLimit = len(jobs) - stmt.SOffset
}

jobs = jobs[stmt.Offset : stmt.Offset+stmt.Limit]
jobs = jobs[stmt.SOffset : stmt.SOffset+stmt.SLimit]
}
}

Expand Down
10 changes: 10 additions & 0 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,16 @@ func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, e
return nil, err
}

// Parse series limit: "SLIMIT <n>".
if stmt.SLimit, err = p.parseOptionalTokenAndInt(SLIMIT); err != nil {
return nil, err
}

// Parse series offset: "SOFFSET <n>".
if stmt.SOffset, err = p.parseOptionalTokenAndInt(SOFFSET); err != nil {
return nil, err
}

return stmt, nil
}

Expand Down
11 changes: 11 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// SELECT statement with SLIMIT and SOFFSET
{
s: `SELECT field1 FROM myseries SLIMIT 10 SOFFSET 5`,
stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.VarRef{Val: "field1"}}},
Source: &influxql.Measurement{Name: "myseries"},
SLimit: 10,
SOffset: 5,
},
},

// SELECT * FROM cpu WHERE host = 'serverC' AND region =~ /.*west.*/
{
s: `SELECT * FROM cpu WHERE host = 'serverC' AND region =~ /.*west.*/`,
Expand Down
8 changes: 6 additions & 2 deletions influxql/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ const (
KEY
KEYS
LIMIT
SHOW
MEASUREMENT
MEASUREMENTS
OFFSET
Expand All @@ -104,6 +103,9 @@ const (
REVOKE
SELECT
SERIES
SHOW
SLIMIT
SOFFSET
TAG
TO
USER
Expand Down Expand Up @@ -184,7 +186,6 @@ var tokens = [...]string{
KEY: "KEY",
KEYS: "KEYS",
LIMIT: "LIMIT",
SHOW: "SHOW",
MEASUREMENT: "MEASUREMENT",
MEASUREMENTS: "MEASUREMENTS",
OFFSET: "OFFSET",
Expand All @@ -202,6 +203,9 @@ var tokens = [...]string{
REVOKE: "REVOKE",
SELECT: "SELECT",
SERIES: "SERIES",
SHOW: "SHOW",
SLIMIT: "SLIMIT",
SOFFSET: "SOFFSET",
TAG: "TAG",
TO: "TO",
USER: "USER",
Expand Down
10 changes: 5 additions & 5 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,37 +1531,37 @@ func TestServer_LimitAndOffset(t *testing.T) {
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": host}, Timestamp: time.Unix(int64(i), 0), Fields: map[string]interface{}{"value": float64(i)}}})
}

results := s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * LIMIT 20`), "foo", nil)
results := s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * SLIMIT 20`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error during COUNT: %s", res.Err)
} else if len(res.Series) != 9 {
t.Fatalf("unexpected 9 series back but got %d", len(res.Series))
}

results = s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * LIMIT 2 OFFSET 1`), "foo", nil)
results = s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * SLIMIT 2 SOFFSET 1`), "foo", nil)
expected := `{"series":[{"name":"cpu","tags":{"host":"server-2","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]},{"name":"cpu","tags":{"host":"server-3","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}`
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error during COUNT: %s", res.Err)
} else if s := mustMarshalJSON(res); s != expected {
t.Fatalf("unexpected row(0) during COUNT:\n exp: %s\n got: %s", expected, s)
}

results = s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * LIMIT 2 OFFSET 3`), "foo", nil)
results = s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * SLIMIT 2 SOFFSET 3`), "foo", nil)
expected = `{"series":[{"name":"cpu","tags":{"host":"server-4","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]},{"name":"cpu","tags":{"host":"server-5","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}`
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error during COUNT: %s", res.Err)
} else if s := mustMarshalJSON(res); s != expected {
t.Fatalf("unexpected row(0) during COUNT:\n exp: %s\n got: %s", expected, s)
}

results = s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * LIMIT 3 OFFSET 8`), "foo", nil)
results = s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * SLIMIT 3 SOFFSET 8`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error during COUNT: %s", res.Err)
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","tags":{"host":"server-9","region":"us-east"},"columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}` {
t.Fatalf("unexpected row(0) during COUNT: %s", s)
}

results = s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * LIMIT 3 OFFSET 20`), "foo", nil)
results = s.ExecuteQuery(MustParseQuery(`SELECT count(value) FROM cpu GROUP BY * SLIMIT 3 SOFFSET 20`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error during COUNT: %s", res.Err)
}
Expand Down

0 comments on commit 031ea1d

Please sign in to comment.