Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle distributed queries when shards != data nodes #2336

Merged
merged 4 commits into from
Apr 20, 2015
Merged

Conversation

jwilder
Copy link
Contributor

@jwilder jwilder commented Apr 18, 2015

This PR has 3 changes:

  1. There was previously a explict panic put in the query engine to prevent
    queries where the number of shards was not equal to the number of data nodes
    in the cluster. This was waiting for the distributed queries branch to land
    but was not removed when that landed. This PR removes that panic.
  2. Adds a change to prefer local shard data over remote data to prevent unnecessary network traffic when deciding whether to use local LocalMapper or a RemoteMapper
  3. Changes the order of closing the broker and raft log to avoid a panic when closing the server.

Fixes #2272

@svscorp
Copy link

svscorp commented Apr 18, 2015

👍

@otoolep
Copy link
Contributor

otoolep commented Apr 18, 2015

@jwilder -- one thing you should be aware of is that Test3NodeClusterPartiallyReplicated is skipped so no actual distributed queries are getting run (not via that test anyway). Do you want to try enabling that test and seeing how your code runs? This is the test that is causing problems in CI, so we need to understand why and fix it before we can consider DQ complete.

@@ -105,14 +105,14 @@ func (s *Node) Close() error {
}
}

if s.Broker != nil {
if err := s.Broker.Close(); err != nil {
if s.raftLog != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

@jwilder jwilder force-pushed the 2272-fix branch 2 times, most recently from ec35428 to 9612ea7 Compare April 19, 2015 22:44

shard := sg.Shards[0]
// pick a shard to query
shard := sg.Shards[rand.Intn(len(sg.Shards))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something, but I don't see how this can work. Shard groups are created here:

https://github.com/influxdb/influxdb/blob/master/server.go#L1090

If there are 3 nodes, and a replication factor of 1, then 3 shards are created on the cluster, each with different data (there is no replication, sharding takes place purely for write throughput). Therefore when a shard group is selected for query, then selecting only 1 of the shards at random means that 2/3 of the data in that shard group is not queried. It seems that this code assumes that all shards in a shard group contain the same data, which is not always the case.

Am I missing something? Furthermore I'm pretty sure this whole thing needs to be more complex than this. Say I have to query series IDs 3 & 4, and a certain shard group is the one I want (determined by time). It may be possible that I don't need to query 1 of the shards in the shard group, because I know that data for Series IDs 3 & 4 doesn't exist in that 1 shard, only the other two. I can determine this by reversing the shard routing that takes place at write-time.

If my reasoning is correct, our testing need work, since it should catch this, but it did not. If it's not correct, can you explain what I am missing?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also curios how will it behave with 3 nodes and replicaN = 2

https://github.com/influxdb/influxdb/blob/master/server.go#L1112

Fixes #2272

There was previously a explict panic put in the query engine to prevent
queries where the number of shards was not equal to the number of data nodes
in the cluster.  This was waiting for the distributed queries branch to land
but was not removed when that landed.
Closing the broker before the raft log can trigger this panic since the
raft log depends on the broker via the FSM.

panic: apply: broker apply: broker already closed

goroutine 29164 [running]:
github.com/influxdb/influxdb/raft.(*Log).applier(0xc20833b040, 0xc20802bd40)
	/Users/jason/go/src/github.com/influxdb/influxdb/raft/log.go:1386 +0x278
created by github.com/influxdb/influxdb/raft.func·002
	/Users/jason/go/src/github.com/influxdb/influxdb/raft/log.go:389 +0x764
shards := map[*Shard][]uint64{}
for _, sid := range t.SeriesIDs {
shard := sg.ShardBySeriesID(sid)
shards[shard] = append(shards[shard], sid)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, very good. You're using "map" as a set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it even clearer, you might just like to store struct{} in there, and store nil as the value, since we don't care about the value. Right now sid will be overwritten with newer values so it's no use.

@otoolep
Copy link
Contributor

otoolep commented Apr 20, 2015

Looks good -- I think you've got it. All makes sense to me. +1

@otoolep
Copy link
Contributor

otoolep commented Apr 20, 2015

Don't forget the changelog.

toddboom added a commit that referenced this pull request Apr 20, 2015
Handle distributed queries when shards != data nodes
@toddboom toddboom merged commit 30b56ce into master Apr 20, 2015
@toddboom toddboom deleted the 2272-fix branch April 20, 2015 20:16
@toddboom
Copy link
Contributor

@jwilder merged. i did the changelog on master.

@toddboom toddboom restored the 2272-fix branch April 20, 2015 20:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

clustering: influxdb 0.9.0-rc23 panics when doing a GET with merge_metrics in a 3 node cluster
4 participants