Skip to content

Commit

Permalink
Merge pull request #2202 from influxdb/distributed-queries-m
Browse files Browse the repository at this point in the history
Distributed Queries
  • Loading branch information
otoolep committed Apr 11, 2015
2 parents d79702b + 5890025 commit 30fc6df
Show file tree
Hide file tree
Showing 16 changed files with 705 additions and 176 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## v0.9.0-rc23 [Unreleased]

### Features
- [#2202](https://github.com/influxdb/influxdb/pull/2202): Initial implementation of Distributed Queries
- [#2202](https://github.com/influxdb/influxdb/pull/2202): 64-bit Series IDs. INCOMPATIBLE WITH PREVIOUS DATASTORES.

### Bugfixes
- [#2225](https://github.com/influxdb/influxdb/pull/2225): Make keywords completely case insensitive
- [#2228](https://github.com/influxdb/influxdb/pull/2228): Accept keyword default unquoted in ALTER RETENTION POLICY statement
Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

if strings.HasPrefix(r.URL.Path, "/data_nodes") ||
strings.HasPrefix(r.URL.Path, "/process_continuous_queries") ||
strings.HasPrefix(r.URL.Path, "/run_mapper") ||
strings.HasPrefix(r.URL.Path, "/metastore") {
h.serveMetadata(w, r)
return
Expand Down
66 changes: 44 additions & 22 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ func createDatabase(t *testing.T, testName string, nodes Cluster, database strin

// createRetentionPolicy creates a retetention policy and verifies that the creation was successful.
// Replication factor is set to equal the number nodes in the cluster.
func createRetentionPolicy(t *testing.T, testName string, nodes Cluster, database, retention string) {
func createRetentionPolicy(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) {
t.Logf("Creating retention policy %s for database %s", retention, database)
command := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, len(nodes))
command := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, replicationFactor)
query(t, nodes[:1], "", command, `{"results":[{}]}`, "")
}

Expand Down Expand Up @@ -281,12 +281,16 @@ var limitAndOffset = func(t *testing.T, node *TestNode, database, retention stri
}
}

func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster, database, retention string) {
func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) {
// skip this test if they're just looking to run some of thd data tests
if os.Getenv("TEST_PREFIX") != "" {
return
}
t.Logf("Running %s:rawDataReturnsInOrder against %d-node cluster", testName, len(nodes))

// Start by ensuring database and retention policy exist.
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retention)
createRetentionPolicy(t, testName, nodes, database, retention, replicationFactor)
numPoints := 500
var expected string

Expand All @@ -297,7 +301,7 @@ func runTest_rawDataReturnsInOrder(t *testing.T, testName string, nodes Cluster,
}

expected = fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",%d]]}]}]}`, numPoints-1)
got, ok := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 60*time.Second)
got, ok := queryAndWait(t, nodes, database, `SELECT count(value) FROM cpu`, expected, "", 120*time.Second)
if !ok {
t.Errorf("test %s:rawDataReturnsInOrder failed, SELECT count() query returned unexpected data\nexp: %s\n, got: %s", testName, expected, got)
}
Expand Down Expand Up @@ -347,15 +351,15 @@ func runTests_Errors(t *testing.T, nodes Cluster) {
}

// runTests tests write and query of data. Setting testNumbers allows only a subset of tests to be run.
func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string) {
func runTestsData(t *testing.T, testName string, nodes Cluster, database, retention string, replicationFactor int) {
t.Logf("Running tests against %d-node cluster", len(nodes))

yesterday := time.Now().Add(-1 * time.Hour * 24).UTC()
now := time.Now().UTC()

// Start by ensuring database and retention policy exist.
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retention)
createRetentionPolicy(t, testName, nodes, database, retention, replicationFactor)

// The tests. Within these tests %DB% and %RP% will be replaced with the database and retention passed into
// this function.
Expand Down Expand Up @@ -558,7 +562,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
{"name": "load", "timestamp": "2000-01-01T00:00:10Z", "tags": {"region": "us-east", "host": "serverB"}, "fields": {"value": 30}},
{"name": "load", "timestamp": "2000-01-01T00:00:00Z", "tags": {"region": "us-west", "host": "serverC"}, "fields": {"value": 100}}
]}`,
query: `SELECT sum(value) FROM load GROUP BY time(10s), region, host`,
query: `SELECT sum(value) FROM load GROUP BY region, host`,
queryDb: "%DB%",
expected: `{"results":[{"series":[{"name":"load","tags":{"host":"serverA","region":"us-east"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",20]]},{"name":"load","tags":{"host":"serverB","region":"us-east"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",30]]},{"name":"load","tags":{"host":"serverC","region":"us-west"},"columns":["time","sum"],"values":[["1970-01-01T00:00:00Z",100]]}]}]}`,
},
Expand Down Expand Up @@ -625,9 +629,9 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
},
{
name: "wildcard GROUP BY queries with time",
query: `SELECT mean(value) FROM cpu GROUP BY *,time(1m)`,
query: `SELECT mean(value) FROM cpu WHERE time >= '2000-01-01T00:00:00Z' AND time < '2000-01-01T00:01:00Z' GROUP BY *,time(1m)`,
queryDb: "%DB%",
expected: `{"results":[{"series":[{"name":"cpu","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",15]]},{"name":"cpu","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",30]]}]}]}`,
expected: `{"results":[{"series":[{"name":"cpu","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",15]]},{"name":"cpu","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["2000-01-01T00:00:00Z",30]]}]}]}`,
},

// WHERE tag queries
Expand Down Expand Up @@ -1307,7 +1311,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
t.Logf(`reseting for test "%s"`, name)
deleteDatabase(t, testName, nodes, database)
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retention)
createRetentionPolicy(t, testName, nodes, database, retention, replicationFactor)
}

if tt.write != "" {
Expand Down Expand Up @@ -1354,8 +1358,8 @@ func TestSingleServer(t *testing.T) {
nodes := createCombinedNodeCluster(t, testName, dir, 1, nil)
defer nodes.Close()

runTestsData(t, testName, nodes, "mydb", "myrp")
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp")
runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))
}

func Test3NodeServer(t *testing.T) {
Expand All @@ -1372,9 +1376,27 @@ func Test3NodeServer(t *testing.T) {
nodes := createCombinedNodeCluster(t, testName, dir, 3, nil)
defer nodes.Close()

runTestsData(t, testName, nodes, "mydb", "myrp")
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp")
runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes))
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes))

}

// ensure that all queries work if there are more nodes in a cluster than the replication factor
func Test3NodeClusterPartiallyReplicated(t *testing.T) {
testName := "3-node server integration"
if testing.Short() {
t.Skip(fmt.Sprintf("skipping '%s'", testName))
}
dir := tempfile()
defer func() {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 3, nil)
defer nodes.Close()

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)-1)
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)-1)
}

func TestClientLibrary(t *testing.T) {
Expand Down Expand Up @@ -1478,7 +1500,7 @@ func TestClientLibrary(t *testing.T) {
test.rp = "myrp"
}
createDatabase(t, testName, nodes, test.db)
createRetentionPolicy(t, testName, nodes, test.db, test.rp)
createRetentionPolicy(t, testName, nodes, test.db, test.rp, len(nodes))
t.Logf("testing %s - %s\n", testName, test.name)
for _, w := range test.writes {
writeResult, err := c.Write(w.bp)
Expand Down Expand Up @@ -1533,7 +1555,7 @@ func Test_ServerSingleGraphiteIntegration(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")
createRetentionPolicy(t, testName, nodes, "graphite", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
Expand Down Expand Up @@ -1584,7 +1606,7 @@ func Test_ServerSingleGraphiteIntegration_FractionalTime(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")
createRetentionPolicy(t, testName, nodes, "graphite", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
Expand Down Expand Up @@ -1636,7 +1658,7 @@ func Test_ServerSingleGraphiteIntegration_ZeroDataPoint(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")
createRetentionPolicy(t, testName, nodes, "graphite", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
Expand Down Expand Up @@ -1747,7 +1769,7 @@ func Test_ServerOpenTSDBIntegration(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
Expand Down Expand Up @@ -1798,7 +1820,7 @@ func Test_ServerOpenTSDBIntegration_WithTags(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
Expand Down Expand Up @@ -1852,7 +1874,7 @@ func Test_ServerOpenTSDBIntegration_BadData(t *testing.T) {
defer nodes.Close()

createDatabase(t, testName, nodes, "opentsdb")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw")
createRetentionPolicy(t, testName, nodes, "opentsdb", "raw", len(nodes))

// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", o.ListenAddress(c.BindAddress))
Expand Down
2 changes: 1 addition & 1 deletion commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement,

type dropSeriesCommand struct {
Database string `json:"database"`
SeriesByMeasurement map[string][]uint32 `json:"seriesIds"`
SeriesByMeasurement map[string][]uint64 `json:"seriesIds"`
}

// createContinuousQueryCommand is the raft command for creating a continuous query on a database
Expand Down
Loading

0 comments on commit 30fc6df

Please sign in to comment.