Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support joining nodes to an existing cluster #3372

Merged
merged 32 commits into from
Jul 23, 2015
Merged

Support joining nodes to an existing cluster #3372

merged 32 commits into from
Jul 23, 2015

Conversation

jwilder
Copy link
Contributor

@jwilder jwilder commented Jul 17, 2015

Overview

This PR adds support for joining a new node to an existing cluster. It implements some of the functionality in #2966.

The way it works is that an existing cluster with an established raft leader must be running. The new node should be started with -join addr:port where addr is the hostname/IP of any existing member and port is the cluster port (default 8088). The new node will attempt to join the cluster and be assigned a node ID. Future shards that are created will be distributed across the cluster and sometimes on the new node.

Queries and writes can go to any node on any of the standard service ports. Queries are not addressed by this PR and currently being worked on in other PRs.

Implementation Details

When there are more than 3 nodes in a cluster, new nodes will not take part in the raft cluster. All of the raft implementation is encapsulated in the meta.Store. To keep the meta.Store implementation from having to check whether it's part of the raft cluster or not, the raft details have been pulled out into a raftState implementation. The meta.Store now delegates raft related call to the raftState.

There are two implementations of raftState: localRaft and remoteRaft. localRaft changes the behavior of the meta.Store to take part of the raft cluster. remoteRaft causes the meta.Store to operate using a local cache with remote calls to the raft cluster. In a subsequent PR, nodes will able to be promoted to raft cluster members and this state pattern will make it easier to change state dynamically.

When a node is started with join options, it initiates a JoinRequest RPC to an existing members. If the member is not the raft leader, the request is proxied to the current leader. The response to the JoinRequest indicates the node ID, whether the node should join the raft cluster (currently always false), and the current set of raft peers. If the node should join the raft cluster, it operates as before. If not, the node calls a FetchMetaData RPC to the raft cluster (auto-proxied to leader) and then starts up.

Changes to the meta-store in the raft cluster need to be propagated to non-raft members. This is handled by blocking FetchMetaData calls initiated by each non-raft member. They maintain a blocking call that waits for a meta.Data change. When triggered, the updated meta.Data is returned and the non-raft member updates it's local cache if needed. The blocking call is then repeated indefinitely. This is similar to have zookeeper/etcd watches works.

Not implemented TODO

The following things are still not implemented:

  • Add new raft members - If you have a one node cluster, and add a new node, it should eventually add the first 3 nodes as part of the raft cluster. This is more involved and needs more work. For now, all new nodes are data-only nodes.
  • Join retries - If all the join addresses fail to join, the node will not retry need to be restarted to retry the join.
  • Leader RPC retries - remote calls to the raft cluster use a random member from the set of peers. If that member is down, the call will fail and return an error. Many of these could be retried automatically.
  • Removing nodes (raft or data-only).

@@ -229,8 +229,8 @@ restore uses a snapshot of a data node to rebuild a cluster.

// Config represents a partial config for rebuilding the server.
type Config struct {
Meta meta.Config `toml:"meta"`
Data tsdb.Config `toml:"data"`
Meta *meta.Config `toml:"meta"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it will become apparent later, but why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to set a private join value on the config if it's specified via the command-line.

@otoolep
Copy link
Contributor

otoolep commented Jul 20, 2015

Changes to the meta-store in the raft cluster need to be propagated to non-raft members. This is handled by blocking FetchMetaData calls initiated by each non-raft member.

How does this interact with the cached MetaStore we have/used-to-have? Are they completely different paths? And serve different purposes?

@jwilder
Copy link
Contributor Author

jwilder commented Jul 20, 2015

@otoolep We still have the caching meta-store. The implementation differences are separated out into raftStates the meta.Store implementation is the same and delegates raft related calls to the raftState. It behaves differently depending on what state it is in.

node, err := func() (*NodeInfo, error) {
// attempt to create the node
node, err := r.store.CreateNode(*req.Addr)
// if it exists, return the exting node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: exting -> existing

@otoolep
Copy link
Contributor

otoolep commented Jul 20, 2015

Took a second look, generally makes sense. I think @benbjohnson really needs to chime in as well though.

ClusterTracing bool `toml:"cluster-tracing"`

// The join command-line argument
join string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this unexported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's set as a command-line arg. It it's public, influxd config lists it as a config option which is not valid.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do toml:"-"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try. But the Config still needs to be mutable.

NodeByHost(host string) (*NodeInfo, error)
WaitForDataChanged() error
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why export RPC but not tracingEnabled or store? It seems like they could all be exported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right.. I think I intended for all this to be private at one point but some is public inadvertently. I'll make fix it up and either make it all private or all public.

@Jhors2
Copy link

Jhors2 commented Jul 22, 2015

Can you explain a bit further how data is sharded after a new data node joins? One very common use case for doing this is an instance where your current cluster is running out of disk space. Is it at all possible to re-shard the data slowly over time to balance total disk utilization of the entire cluster?

@jwilder
Copy link
Contributor Author

jwilder commented Jul 22, 2015

@Jhors2 When new nodes are added, they become part of the pool of nodes that shards can be assigned. Existing shards are not automatically rebalanced or moved. New shards (created when new shard groups are created), will be assigned to both new and existing nodes.

Non-raft nodes need to be notifified when the metastore changes. For
example, a database could be dropped on node 1 (non-raft) and node 2
would not know.  Since queries for that database would not be a cache
miss, node 2 would not get updated.

To propogate changes to non-raft nodes, each non-raft node maintains
a blocking connection to a raft node that blocks until a metadata
change occurs.  When the change is triggered, the updated metadata
is returned to the client and the client idempotently updates its local
cache.  It then reconnects and waits for another change.  This is
similar watches in zookeeper or etcd.  Since the blocking request is
always recreated, it also serves as a polling mechanism that will retry
another raft member if the current connection is lost.
store.go is getting big.
Not needed since it was just used as a safeguard for seeing if we
are the leader.
Will try each once until one succeeds
Useful for troubleshooting but too verbose for regular use.
jwilder added a commit that referenced this pull request Jul 23, 2015
Support joining nodes to an existing cluster
@jwilder jwilder merged commit 7e56a54 into master Jul 23, 2015
@jwilder jwilder deleted the jw-cluster branch July 23, 2015 16:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants