Skip to content

Commit

Permalink
Merge pull request #7646 from influxdata/js-4619-subqueries
Browse files Browse the repository at this point in the history
Support subquery execution in the query language
  • Loading branch information
jsternberg committed Jan 9, 2017
2 parents 9423300 + d7c8c7c commit 4a559c4
Show file tree
Hide file tree
Showing 40 changed files with 4,008 additions and 1,497 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The stress tool `influx_stress` will be removed in a subsequent release. We reco
- [#7709](https://github.com/influxdata/influxdb/pull/7709): Add clear command to cli.
- [#7688](https://github.com/influxdata/influxdb/pull/7688): Adding ability to use parameters in queries in the v2 client using the `Parameters` map in the `Query` struct.
- [#7323](https://github.com/influxdata/influxdb/pull/7323): Allow add items to array config via ENV
- [#4619](https://github.com/influxdata/influxdb/issues/4619): Support subquery execution in the query language.

### Bugfixes

Expand Down
10 changes: 7 additions & 3 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,13 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
// Initialize query executor.
s.QueryExecutor = influxql.NewQueryExecutor()
s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
MetaClient: s.MetaClient,
TaskManager: s.QueryExecutor.TaskManager,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
MetaClient: s.MetaClient,
TaskManager: s.QueryExecutor.TaskManager,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
ShardMapper: &coordinator.LocalShardMapper{
MetaClient: s.MetaClient,
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
},
Monitor: s.Monitor,
PointsWriter: s.PointsWriter,
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
Expand Down
195 changes: 195 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4549,6 +4549,201 @@ func TestServer_Query_GroupByTimeCutoffs(t *testing.T) {
}
}

func TestServer_Query_Subqueries(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}

writes := []string{
fmt.Sprintf(`cpu,host=server01 usage_user=70i,usage_system=30i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01 usage_user=45i,usage_system=55i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01 usage_user=23i,usage_system=77i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 usage_user=11i,usage_system=89i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 usage_user=28i,usage_system=72i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02 usage_user=12i,usage_system=53i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}

test.addQueries([]*Query{
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean FROM (SELECT mean(usage_user) FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",31.5]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT value FROM (SELECT mean(usage_user) AS value FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T00:00:00Z",31.5]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(usage) FROM (SELECT 100 - usage_user AS usage FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",68.5]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT host FROM (SELECT min(usage_user), host FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host"],"values":[["2000-01-01T00:00:00Z","server02"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT host FROM (SELECT min(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host"],"values":[["2000-01-01T00:00:00Z","server02"],["2000-01-01T00:00:20Z","server01"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(min) FROM (SELECT min(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",17]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT max(min), host FROM (SELECT min(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","max","host"],"values":[["2000-01-01T00:00:20Z",23,"server01"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT host FROM (SELECT mean(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0}]}`,
skip: true,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT max(usage_system) FROM (SELECT min(usage_user), usage_system FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","max"],"values":[["2000-01-01T00:00:00Z",89]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(top), host FROM (SELECT top(usage_user, host, 2) FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min","host"],"values":[["2000-01-01T00:00:10Z",28,"server02"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(top), host FROM (SELECT top(usage_user, 2), host FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min","host"],"values":[["2000-01-01T00:00:10Z",45,"server01"]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT count(host) FROM (SELECT top(usage_user, host, 2) FROM cpu) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-01-01T00:00:00Z",2]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT sum(derivative) FROM (SELECT derivative(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","sum"],"values":[["2000-01-01T00:00:00Z",-4.6]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(max) FROM (SELECT 100 - max(usage_user) FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:00Z",30]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(usage_system) FROM (SELECT max(usage_user), 100 - usage_system FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:10Z",28]]}]}]}`,
},
&Query{
params: url.Values{"db": []string{"db0"}},
command: `SELECT min(value) FROM (SELECT max(usage_user), usage_user - usage_system AS value FROM cpu GROUP BY host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:30Z'`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","min"],"values":[["2000-01-01T00:00:10Z",-44]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Query_SubqueryWithGroupBy(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}

writes := []string{
fmt.Sprintf(`cpu,host=server01,region=uswest value=1i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=uswest value=2i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=uswest value=3i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=uswest value=4i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=uswest value=5i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=uswest value=6i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=uswest value=7i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=uswest value=8i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=9i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=10i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=11i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server01,region=useast value=12i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=13i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=14i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:01Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=15i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:02Z").UnixNano()),
fmt.Sprintf(`cpu,host=server02,region=useast value=16i %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:03Z").UnixNano()),
}
test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}

test.addQueries([]*Query{
&Query{
name: "group by time(2s) - time(2s), host",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(mean) FROM (SELECT mean(value) FROM cpu GROUP BY time(2s), host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:04Z' GROUP BY time(2s)`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",7.5],["2000-01-01T00:00:02Z",9.5]]}]}]}`,
},
&Query{
name: "group by time(4s), host - time(2s), host",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(mean) FROM (SELECT mean(value) FROM cpu GROUP BY time(2s), host) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:04Z' GROUP BY time(4s), host`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",6.5]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",10.5]]}]}]}`,
},
&Query{
name: "group by time(2s), host - time(2s), host, region",
params: url.Values{"db": []string{"db0"}},
command: `SELECT mean(mean) FROM (SELECT mean(value) FROM cpu GROUP BY time(2s), host, region) WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:00:04Z' GROUP BY time(2s), host`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",5.5],["2000-01-01T00:00:02Z",7.5]]},{"name":"cpu","tags":{"host":"server02"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",9.5],["2000-01-01T00:00:02Z",11.5]]}]}]}`,
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Write_Precision(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
Expand Down
2 changes: 1 addition & 1 deletion coordinator/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type MetaClient interface {
RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
SetAdminPrivilege(username string, admin bool) error
SetPrivilege(username, database string, p influxql.Privilege) error
ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error)
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
UpdateUser(name, password string) error
UserPrivilege(username, database string) (*influxql.Privilege, error)
Expand Down
6 changes: 3 additions & 3 deletions coordinator/meta_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type MetaClient struct {
RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
SetAdminPrivilegeFn func(username string, admin bool) error
SetPrivilegeFn func(username, database string, p influxql.Privilege) error
ShardsByTimeRangeFn func(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error)
ShardGroupsByTimeRangeFn func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
UpdateUserFn func(name, password string) error
UserPrivilegeFn func(username, database string) (*influxql.Privilege, error)
Expand Down Expand Up @@ -127,8 +127,8 @@ func (c *MetaClient) SetPrivilege(username, database string, p influxql.Privileg
return c.SetPrivilegeFn(username, database, p)
}

func (c *MetaClient) ShardsByTimeRange(sources influxql.Sources, tmin, tmax time.Time) (a []meta.ShardInfo, err error) {
return c.ShardsByTimeRangeFn(sources, tmin, tmax)
func (c *MetaClient) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) {
return c.ShardGroupsByTimeRangeFn(database, policy, min, max)
}

func (c *MetaClient) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error {
Expand Down

0 comments on commit 4a559c4

Please sign in to comment.