Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make show series return IDs and work with limit and offset. #1771

Merged
merged 2 commits into from
Feb 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- [#1720](https://github.com/influxdb/influxdb/pull/1720): Parse Series IDs as unsigned 32-bits.
- [#1767](https://github.com/influxdb/influxdb/pull/1767): Drop Series was failing across shards. Issue #1761.
- [#1773](https://github.com/influxdb/influxdb/pull/1773): Fix bug when merging series together that have unequal number of points in a group by interval
- [#1771](https://github.com/influxdb/influxdb/pull/1771): Make `SHOW SERIES` return IDs and support `LIMIT` and `OFFSET`

### Features

Expand Down
69 changes: 23 additions & 46 deletions httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,50 +1652,27 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "cpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server01", ""}),
str2iface([]string{"server01", "uswest"}),
str2iface([]string{"server01", "useast"}),
str2iface([]string{"server02", "useast"}),
[]interface{}{1, "server01", ""},
[]interface{}{2, "server01", "uswest"},
[]interface{}{3, "server01", "useast"},
[]interface{}{4, "server02", "useast"},
},
},
{
Name: "gpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server02", "useast"}),
str2iface([]string{"server03", "caeast"}),
[]interface{}{5, "server02", "useast"},
[]interface{}{6, "server03", "caeast"},
},
},
},
},
},
},
},
// SHOW SERIES ... LIMIT
// {
// q: `SHOW SERIES LIMIT 1`,
// r: &influxdb.Results{
// Results: []*influxdb.Result{
// &influxdb.Result{
// Series: []*influxql.Row{
// &influxql.Row{
// Name: "cpu",
// Columns: []string{"host", "region"},
// Values: [][]interface{}{
// str2iface([]string{"server01", ""}),
// str2iface([]string{"server01", "uswest"}),
// str2iface([]string{"server01", "useast"}),
// str2iface([]string{"server02", "useast"}),
// },
// },
// },
// },
// },
// },
// },
// SHOW SERIES FROM
{
q: `SHOW SERIES FROM cpu`,
r: &influxdb.Results{
Expand All @@ -1704,12 +1681,12 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "cpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server01", ""}),
str2iface([]string{"server01", "uswest"}),
str2iface([]string{"server01", "useast"}),
str2iface([]string{"server02", "useast"}),
[]interface{}{1, "server01", ""},
[]interface{}{2, "server01", "uswest"},
[]interface{}{3, "server01", "useast"},
[]interface{}{4, "server02", "useast"},
},
},
},
Expand All @@ -1726,9 +1703,9 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "cpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server01", "uswest"}),
[]interface{}{2, "server01", "uswest"},
},
},
},
Expand All @@ -1746,9 +1723,9 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "gpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server03", "caeast"}),
[]interface{}{6, "server03", "caeast"},
},
},
},
Expand All @@ -1766,9 +1743,9 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "gpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server03", "caeast"}),
[]interface{}{6, "server03", "caeast"},
},
},
},
Expand All @@ -1786,10 +1763,10 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "cpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server01", "useast"}),
str2iface([]string{"server02", "useast"}),
[]interface{}{3, "server01", "useast"},
[]interface{}{4, "server02", "useast"},
},
},
},
Expand Down Expand Up @@ -1819,7 +1796,7 @@ func TestHandler_serveShowSeries(t *testing.T) {
if !reflect.DeepEqual(tt.err, errstring(r.Err)) {
t.Logf("query #%d: %s", i, tt.q)
t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err)
} else if tt.err == "" && !reflect.DeepEqual(tt.r, r) {
} else if tt.err == "" && mustMarshalJSON(tt.r) != body {
t.Logf("query #%d: %s", i, tt.q)
t.Logf("exp = %s", mustMarshalJSON(tt.r))
t.Logf("got = %s", body)
Expand Down
43 changes: 37 additions & 6 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,11 +2000,6 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
return &Result{Err: err}
}

// // If OFFSET is past the end of the array, return empty results.
// if stmt.Offset > len(measurements)-1 {
// return &Result{}
// }

// Create result struct that will be populated and returned.
result := &Result{
Series: make(influxql.Rows, 0, len(measurements)),
Expand Down Expand Up @@ -2039,7 +2034,8 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
// Loop through series IDs getting matching tag sets.
for _, id := range ids {
if s, ok := m.seriesByID[id]; ok {
values := make([]interface{}, 0, len(r.Columns))
values := make([]interface{}, 0, len(r.Columns)+1)
values = append(values, id)
for _, column := range r.Columns {
values = append(values, s.Tags[column])
}
Expand All @@ -2048,14 +2044,49 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
r.Values = append(r.Values, values)
}
}
// make the id the first column
r.Columns = append([]string{"id"}, r.Columns...)

// Append the row to the result.
result.Series = append(result.Series, r)
}

if stmt.Limit > 0 || stmt.Offset > 0 {
result.Series = s.filterShowSeriesResult(stmt.Limit, stmt.Offset, result.Series)
}

return result
}

// filterShowSeriesResult will limit the number of series returned based on the limit and the offset.
// Unlike limit and offset on SELECT statements, the limit and offset don't apply to the number of Rows, but
// to the number of total Values returned, since each Value represents a unique series.
func (s *Server) filterShowSeriesResult(limit, offset int, rows influxql.Rows) influxql.Rows {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to be a method of Server. Or, if left as a method, don't shadow the receiver s below.

var filteredSeries influxql.Rows
seriesCount := 0
for _, r := range rows {
var currentSeries [][]interface{}

// filter the values
for _, v := range r.Values {
if seriesCount >= offset && seriesCount-offset < limit {
currentSeries = append(currentSeries, v)
}
seriesCount++
}

// only add the row back in if there are some values in it
if len(currentSeries) > 0 {
r.Values = currentSeries
filteredSeries = append(filteredSeries, r)
if seriesCount > limit+offset {
return filteredSeries
}
}
}
return filteredSeries
}

func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string, user *User) *Result {
s.mu.RLock()
defer s.mu.RUnlock()
Expand Down
68 changes: 61 additions & 7 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ func TestServer_DropMeasurement(t *testing.T) {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}

Expand Down Expand Up @@ -977,7 +977,7 @@ func TestServer_DropMeasurementSeriesTagsPreserved(t *testing.T) {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 2 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]},{"name":"memory","columns":["host","region"],"values":[["serverB","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"]]},{"name":"memory","columns":["id","host","region"],"values":[[2,"serverB","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}

Expand Down Expand Up @@ -1012,7 +1012,7 @@ func TestServer_DropMeasurementSeriesTagsPreserved(t *testing.T) {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"memory","columns":["host","region"],"values":[["serverB","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"memory","columns":["id","host","region"],"values":[[2,"serverB","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}

Expand Down Expand Up @@ -1076,7 +1076,7 @@ func TestServer_DropSeries(t *testing.T) {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}

Expand Down Expand Up @@ -1133,7 +1133,7 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"],["serverB","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"],[2,"serverB","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}

Expand All @@ -1220,7 +1220,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["host","region"],"values":[["serverB","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[2,"serverB","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}

Expand Down Expand Up @@ -1328,6 +1328,60 @@ func TestServer_ExecuteQuery(t *testing.T) {
}
}

// Ensure the server respects limit and offset in show series queries
func TestServer_ShowSeriesLimitOffset(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")

// Write series with one point to the database.
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(20)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverB"}, Timestamp: mustParseTime("2000-01-01T00:00:10Z"), Fields: map[string]interface{}{"value": float64(30)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west", "host": "serverC"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "memory", Tags: map[string]string{"region": "us-west", "host": "serverB"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "memory", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}})

// Select data from the server.
results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 3 OFFSET 1`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 2 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[2,"serverB","us-east"],[3,"serverC","us-west"]]},{"name":"memory","columns":["id","host","region"],"values":[[4,"serverB","us-west"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}

// Select data from the server.
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 2 OFFSET 4`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"memory","columns":["id","host","region"],"values":[[5,"serverA","us-east"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}

// Select data from the server.
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 2 OFFSET 20`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 0 {
t.Fatalf("unexpected row count: %d", len(res.Series))
}

// Select data from the server.
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 20`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 2 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","us-east"],[2,"serverB","us-east"],[3,"serverC","us-west"]]},{"name":"memory","columns":["id","host","region"],"values":[[4,"serverB","us-west"],[5,"serverA","us-east"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}

// Ensure that when querying for raw data values that they return in time order
func TestServer_RawDataReturnsInOrder(t *testing.T) {
s := OpenServer(NewMessagingClient())
Expand Down