Skip to content

Commit

Permalink
Support subquery execution in the query language
Browse files Browse the repository at this point in the history
This adds query syntax support for subqueries and adds support to the
query engine to execute queries on subqueries.

Subqueries act as a source for another query. It is the equivalent of
writing the results of a query to a temporary database, executing
a query on that temporary database, and then deleting the database
(except this is all performed in-memory).

The syntax is like this:

    SELECT sum(derivative) FROM (SELECT derivative(mean(value)) FROM cpu GROUP BY *)

This will execute derivative and then sum the result of those derivatives.
Another example:

    SELECT max(min) FROM (SELECT min(value) FROM cpu GROUP BY host)

This would let you find the maximum minimum value of each host.

There is complete freedom to mix subqueries with auxiliary fields. The only
caveat is that the following two queries:

    SELECT mean(value) FROM cpu
    SELECT mean(value) FROM (SELECT value FROM cpu)

Have different performance characteristics. The first will calculate
`mean(value)` at the shard level and will be faster, especially when it comes to
clustered setups. The second will process the mean at the top level and will not
include that optimization.
  • Loading branch information
jsternberg committed Jan 7, 2017
1 parent 153277c commit d7c8c7c
Show file tree
Hide file tree
Showing 40 changed files with 4,008 additions and 1,497 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
- [#7709](https://github.com/influxdata/influxdb/pull/7709): Add clear command to cli.
- [#7688](https://github.com/influxdata/influxdb/pull/7688): Adding ability to use parameters in queries in the v2 client using the `Parameters` map in the `Query` struct.
- [#7323](https://github.com/influxdata/influxdb/pull/7323): Allow add items to array config via ENV
- [#4619](https://github.com/influxdata/influxdb/issues/4619): Support subquery execution in the query language.

### Bugfixes

Expand Down
10 changes: 7 additions & 3 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,13 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
// Initialize query executor.
s.QueryExecutor = influxql.NewQueryExecutor()
s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
MetaClient: s.MetaClient,
TaskManager: s.QueryExecutor.TaskManager,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
MetaClient: s.MetaClient,
TaskManager: s.QueryExecutor.TaskManager,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
ShardMapper: &coordinator.LocalShardMapper{
MetaClient: s.MetaClient,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
},
Monitor: s.Monitor,
PointsWriter: s.PointsWriter,
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
Expand Down
195 changes: 195 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4549,6 +4549,201 @@ func TestServer_Query_GroupByTimeCutoffs(t *testing.T) {
}
}

func TestServer_Query_Subqueries(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}

writes := []string{
fmt.Sprintf(`cpu,host=server01 usage_user=70i,usage_system=30i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01 usage_user=45i,usage_system=55i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01 usage_user=23i,usage_system=77i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 usage_user=11i,usage_system=89i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 usage_user=28i,usage_system=72i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 usage_user=12i,usage_system=53i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}

test.addQueries([]*Query{
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean FROM (SELECT mean(usage_user) FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",31.5]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT value FROM (SELECT mean(usage_user) AS value FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",31.5]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(usage) FROM (SELECT 100 - usage_user AS usage FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",68.5]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT host FROM (SELECT min(usage_user), host FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host"],"values":[["2000-01-01T00:00:00Z","server02"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT host FROM (SELECT min(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host"],"values":[["2000-01-01T00:00:00Z","server02"],["2000-01-01T00:00:20Z","server01"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(min) FROM (SELECT min(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",17]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT max(min), host FROM (SELECT min(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","max","host"],"values":[["2000-01-01T00:00:20Z",23,"server01"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT host FROM (SELECT mean(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0}]}`,
skip: true,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT max(usage_system) FROM (SELECT min(usage_user), usage_system FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","max"],"values":[["2000-01-01T00:00:00Z",89]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(top), host FROM (SELECT top(usage_user, host, 2) FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min","host"],"values":[["2000-01-01T00:00:10Z",28,"server02"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(top), host FROM (SELECT top(usage_user, 2), host FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min","host"],"values":[["2000-01-01T00:00:10Z",45,"server01"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT count(host) FROM (SELECT top(usage_user, host, 2) FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-01-01T00:00:00Z",2]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT sum(derivative) FROM (SELECT derivative(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",-4.6]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(max) FROM (SELECT 100 - max(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:00Z",30]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(usage_system) FROM (SELECT max(usage_user), 100 - usage_system FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:10Z",28]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(value) FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-44]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Query_SubqueryWithGroupBy(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}

writes := []string{
fmt.Sprintf(`cpu,host=server01,region=uswest value=1i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=uswest value=2i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=uswest value=3i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=uswest value=4i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=uswest value=5i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=uswest value=6i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=uswest value=7i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=uswest value=8i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=9i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=10i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=11i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=12i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=13i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=14i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=15i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=16i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}

test.addQueries([]*Query{
&Query{
name: "group by time(2s) - time(2s), host",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(mean) FROM (SELECT mean(value) FROM cpu GROUP BY time(2s), host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:04Z' GROUP BY time(2s)`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",7.5],["2000-01-01T00:00:02Z",9.5]]}]}]}`,
},
&Query{
name: "group by time(4s), host - time(2s), host",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(mean) FROM (SELECT mean(value) FROM cpu GROUP BY time(2s), host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:04Z' GROUP BY time(4s), host`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",6.5]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",10.5]]}]}]}`,
},
&Query{
name: "group by time(2s), host - time(2s), host, region",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(mean) FROM (SELECT mean(value) FROM cpu GROUP BY time(2s), host, region) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:04Z' GROUP BY time(2s), host`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",5.5],["2000-01-01T00:00:02Z",7.5]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",9.5],["2000-01-01T00:00:02Z",11.5]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Write_Precision(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
Expand Down
2 changes: 1 addition & 1 deletion coordinator/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type MetaClient interface {
RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
SetAdminPrivilege(username string, admin bool) error
SetPrivilege(username, database string, p influxql.Privilege) error
ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error)
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
UpdateUser(name, password string) error
UserPrivilege(username, database string) (*influxql.Privilege, error)
Expand Down
6 changes: 3 additions & 3 deletions coordinator/meta_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type MetaClient struct {
RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
SetAdminPrivilegeFn func(username string, admin bool) error
SetPrivilegeFn func(username, database string, p influxql.Privilege) error
ShardsByTimeRangeFn func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error)
ShardGroupsByTimeRangeFn func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
UpdateUserFn func(name, password string) error
UserPrivilegeFn func(username, database string) (*influxql.Privilege, error)
Expand Down Expand Up @@ -127,8 +127,8 @@ func (c *MetaClient) SetPrivilege(username, database string, p influxql.Privileg
return c.SetPrivilegeFn(username, database, p)
}

func (c *MetaClient) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
return c.ShardsByTimeRangeFn(sources, tmin, tmax)
func (c *MetaClient) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) {
return c.ShardGroupsByTimeRangeFn(database, policy, min, max)
}

func (c *MetaClient) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error {
Expand Down
Loading

0 comments on commit d7c8c7c

Please sign in to comment.