Skip to content

Commit

Permalink
Merge pull request #5706 from influxdata/jw-cluster
Browse files Browse the repository at this point in the history
Cluster Setup
  • Loading branch information
jwilder committed Feb 18, 2016
2 parents ae7b027 + 04ba794 commit 26163cb
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 47 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
- [#5666](https://github.com/influxdata/influxdb/pull/5666): Manage dependencies with gdm
- [#5512](https://github.com/influxdata/influxdb/pull/5512): HTTP: Add config option to enable HTTP JSON write path which is now disabled by default.
- [#5336](https://github.com/influxdata/influxdb/pull/5366): Enabled golint for influxql. @gabelev
- [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup
cleanup

### Bugfixes

Expand Down
18 changes: 17 additions & 1 deletion cmd/influxd/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,19 @@ func (cmd *Command) Run(args ...string) error {
return fmt.Errorf("apply env config: %v", err)
}

// Command-line flags for -join and -hostname override the config
// and env variable
if options.Join != "" {
config.Meta.JoinPeers = strings.Split(options.Join, ",")
}

if options.Hostname != "" {
config.Hostname = options.Hostname
}

// Propogate the top-level hostname down to dependendent configs
config.Meta.RemoteHostname = config.Hostname

// Validate the configuration.
if err := config.Validate(); err != nil {
return fmt.Errorf("%s. To generate a valid configuration file run `influxd config > influxdb.generated.conf`", err)
Expand Down Expand Up @@ -156,6 +165,7 @@ func (cmd *Command) ParseFlags(args ...string) (Options, error) {
fs.StringVar(&options.ConfigPath, "config", "", "")
fs.StringVar(&options.PIDFile, "pidfile", "", "")
fs.StringVar(&options.Join, "join", "", "")
fs.StringVar(&options.Hostname, "hostname", "", "")
fs.StringVar(&options.CPUProfile, "cpuprofile", "", "")
fs.StringVar(&options.MemProfile, "memprofile", "", "")
fs.Usage = func() { fmt.Fprintln(cmd.Stderr, usage) }
Expand Down Expand Up @@ -215,7 +225,12 @@ then a new cluster will be initialized unless the -join argument is used.
Set the path to the configuration file.
-join <host:port>
Joins the server to an existing cluster. Should be the HTTP bind address of an existing meta server
Joins the server to an existing cluster. Should be
the HTTP bind address of an existing meta server
-hostname <name>
Override the hostname, the 'hostname' configuration
option will be overridden.
-pidfile <path>
Write process ID to a file.
Expand All @@ -232,6 +247,7 @@ type Options struct {
ConfigPath string
PIDFile string
Join string
Hostname string
CPUProfile string
MemProfile string
}
4 changes: 4 additions & 0 deletions cmd/influxd/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ type Config struct {

// BindAddress is the address that all TCP services use (Raft, Snapshot, Cluster, etc.)
BindAddress string `toml:"bind-address"`

// Hostname is the hostname portion to use when registering local
// addresses. This hostname must be resolvable from other nodes.
Hostname string `toml:"hostname"`
}

// NewConfig returns an instance of Config with reasonable defaults.
Expand Down
39 changes: 26 additions & 13 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,6 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
return nil, fmt.Errorf("must run as either meta node or data node or both")
}

httpBindAddress, err := meta.DefaultHost(DefaultHostname, c.HTTPD.BindAddress)
if err != nil {
return nil, err
}
tcpBindAddress, err := meta.DefaultHost(DefaultHostname, bind)
if err != nil {
return nil, err
}

s := &Server{
buildInfo: *buildInfo,
err: make(chan error),
Expand All @@ -186,9 +177,9 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
joinPeers: c.Meta.JoinPeers,
metaUseTLS: c.Meta.HTTPSEnabled,

httpAPIAddr: httpBindAddress,
httpAPIAddr: c.HTTPD.BindAddress,
httpUseTLS: c.HTTPD.HTTPSEnabled,
tcpAddr: tcpBindAddress,
tcpAddr: bind,

config: c,
}
Expand Down Expand Up @@ -651,11 +642,11 @@ func (s *Server) initializeMetaClient() error {
return nil
}

n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr)
n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
for err != nil {
log.Printf("Unable to create data node. retry in 1s: %s", err.Error())
time.Sleep(time.Second)
n, err = s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr)
n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
}
s.Node.ID = n.ID

Expand All @@ -666,6 +657,28 @@ func (s *Server) initializeMetaClient() error {
return nil
}

// HTTPAddr returns the HTTP address used by other nodes for HTTP queries and writes.
func (s *Server) HTTPAddr() string {
return s.remoteAddr(s.httpAPIAddr)
}

// TCPAddr returns the TCP address used by other nodes for cluster communication.
func (s *Server) TCPAddr() string {
return s.remoteAddr(s.tcpAddr)
}

func (s *Server) remoteAddr(addr string) string {
hostname := s.config.Hostname
if hostname == "" {
hostname = meta.DefaultHostname
}
remote, err := meta.DefaultHost(hostname, addr)
if err != nil {
return addr
}
return remote
}

// MetaServers returns the meta node HTTP addresses used by this server.
func (s *Server) MetaServers() []string {
return s.MetaClient.MetaServers()
Expand Down
17 changes: 16 additions & 1 deletion services/meta/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,9 @@ func (c *Client) JoinMetaServer(httpAddr, tcpAddr string) error {
c.mu.RLock()

if currentServer >= len(c.metaServers) {
// We've tried every server, wait a second before
// trying again
time.Sleep(time.Second)
currentServer = 0
}
server := c.metaServers[currentServer]
Expand Down Expand Up @@ -1014,6 +1017,10 @@ func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.Extension
continue
}

if _, ok := err.(errCommand); ok {
return err
}

time.Sleep(errSleep)
}
}
Expand Down Expand Up @@ -1055,7 +1062,7 @@ func (c *Client) exec(url string, typ internal.Command_Type, desc *proto.Extensi
}
es := res.GetError()
if es != "" {
return 0, fmt.Errorf(es)
return 0, errCommand{msg: es}
}

return res.GetIndex(), nil
Expand Down Expand Up @@ -1252,6 +1259,14 @@ func (e errRedirect) Error() string {
return fmt.Sprintf("redirect to %s", e.host)
}

type errCommand struct {
msg string
}

func (e errCommand) Error() string {
return e.msg
}

type uint64Slice []uint64

func (a uint64Slice) Len() int { return len(a) }
Expand Down
21 changes: 6 additions & 15 deletions services/meta/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Config struct {
Enabled bool `toml:"enabled"`
Dir string `toml:"dir"`

// RemoteHostname is the hostname portion to use when registering meta node
// addresses. This hostname must be resolvable from other nodes.
RemoteHostname string `toml:"-"`

// this is deprecated. Should use the address from run/config.go
BindAddress string `toml:"bind-address"`

Expand All @@ -57,7 +61,7 @@ type Config struct {
HTTPSCertificate string `toml:"https-certificate"`

// JoinPeers if specified gives other metastore servers to join this server to the cluster
JoinPeers []string `toml:"-"`
JoinPeers []string `toml:"join"`
RetentionAutoCreate bool `toml:"retention-autocreate"`
ElectionTimeout toml.Duration `toml:"election-timeout"`
HeartbeatTimeout toml.Duration `toml:"heartbeat-timeout"`
Expand Down Expand Up @@ -85,6 +89,7 @@ func NewConfig() *Config {
RaftPromotionEnabled: DefaultRaftPromotionEnabled,
LeaseDuration: toml.Duration(DefaultLeaseDuration),
LoggingEnabled: DefaultLoggingEnabled,
JoinPeers: []string{},
}
}

Expand All @@ -103,20 +108,6 @@ func (c *Config) defaultHost(addr string) string {
return address
}

// DefaultedBindAddress returns the BindAddress normalized with the
// hosts name or "localhost" if that could not be determined. If
// the BindAddress already has a hostname, BindAddress is returned.
func (c *Config) DefaultedBindAddress() string {
return c.defaultHost(c.BindAddress)
}

// DefaultedHTTPBindAddress returns the HTTPBindAddress normalized with the
// hosts name or "localhost" if that could not be determined. If
// the HTTPBindAddress already has a hostname, HTTPBindAddress is returned.
func (c *Config) DefaultedHTTPBindAddress() string {
return c.defaultHost(c.HTTPBindAddress)
}

func DefaultHost(hostname, addr string) (string, error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
Expand Down
41 changes: 37 additions & 4 deletions services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,29 @@ func (data *Data) CreateDataNode(host, tcpHost string) error {
}
}

// If an existing meta node exists with the same TCPHost address,
// then these nodes are actually the same so re-use the existing ID
var existingID uint64
for _, n := range data.MetaNodes {
if n.TCPHost == tcpHost {
existingID = n.ID
break
}
}

// We didn't find an existing node, so assign it a new node ID
if existingID == 0 {
data.MaxNodeID++
existingID = data.MaxNodeID
}

// Append new node.
data.MaxNodeID++
data.DataNodes = append(data.DataNodes, NodeInfo{
ID: data.MaxNodeID,
ID: existingID,
Host: host,
TCPHost: tcpHost,
})
sort.Sort(NodeInfos(data.DataNodes))

return nil
}
Expand Down Expand Up @@ -151,14 +167,31 @@ func (data *Data) CreateMetaNode(httpAddr, tcpAddr string) error {
}
}

// If an existing data node exists with the same TCPHost address,
// then these nodes are actually the same so re-use the existing ID
var existingID uint64
for _, n := range data.DataNodes {
if n.TCPHost == tcpAddr {
existingID = n.ID
break
}
}

// We didn't find and existing data node ID, so assign a new ID
// to this meta node.
if existingID == 0 {
data.MaxNodeID++
existingID = data.MaxNodeID
}

// Append new node.
data.MaxNodeID++
data.MetaNodes = append(data.MetaNodes, NodeInfo{
ID: data.MaxNodeID,
ID: existingID,
Host: httpAddr,
TCPHost: tcpAddr,
})

sort.Sort(NodeInfos(data.MetaNodes))
return nil
}

Expand Down
27 changes: 18 additions & 9 deletions services/meta/raft_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,6 @@ func (r *raftState) open(s *store, ln net.Listener, initializePeers []string) er
peers = []string{r.addr}
}

// If we have multiple nodes in the cluster, make sure our address is in the raft peers or
// we won't be able to boot into the cluster because the other peers will reject our new hostname. This
// is difficult to resolve automatically because we need to have all the raft peers agree on the current members
// of the cluster before we can change them.
if len(peers) > 0 && !raft.PeerContained(peers, r.addr) {
r.logger.Printf("%s is not in the list of raft peers. Please ensure all nodes have the same meta nodes configured", r.addr)
return fmt.Errorf("peers out of sync: %v not in %v", r.addr, peers)
}

// Create the log store and stable store.
store, err := raftboltdb.NewBoltStore(filepath.Join(r.path, "raft.db"))
if err != nil {
Expand Down Expand Up @@ -242,6 +233,24 @@ func (r *raftState) removePeer(addr string) error {
if !r.isLeader() {
return raft.ErrNotLeader
}

peers, err := r.peerStore.Peers()
if err != nil {
return err
}

var exists bool
for _, p := range peers {
if addr == p {
exists = true
break
}
}

if !exists {
return nil
}

if fut := r.raft.RemovePeer(addr); fut.Error() != nil {
return fut.Error()
}
Expand Down
20 changes: 16 additions & 4 deletions services/meta/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type Service struct {
func NewService(c *Config) *Service {
s := &Service{
config: c,
httpAddr: c.DefaultedHTTPBindAddress(),
raftAddr: c.DefaultedBindAddress(),
httpAddr: c.HTTPBindAddress,
raftAddr: c.BindAddress,
https: c.HTTPSEnabled,
cert: c.HTTPSCertificate,
err: make(chan error),
Expand Down Expand Up @@ -110,8 +110,8 @@ func (s *Service) Open() error {
return err
}

// Open the store
s.store = newStore(s.config, s.httpAddr, s.raftAddr)
// Open the store. The addresses passed in are remotely accessible.
s.store = newStore(s.config, s.remoteAddr(s.httpAddr), s.remoteAddr(s.raftAddr))

handler := newHandler(s.config, s)
handler.logger = s.Logger
Expand All @@ -128,6 +128,18 @@ func (s *Service) Open() error {
return nil
}

func (s *Service) remoteAddr(addr string) string {
hostname := s.config.RemoteHostname
if hostname == "" {
hostname = DefaultHostname
}
remote, err := DefaultHost(hostname, addr)
if err != nil {
return addr
}
return remote
}

// serve serves the handler from the listener.
func (s *Service) serve() {
// The listener was closed so exit
Expand Down

0 comments on commit 26163cb

Please sign in to comment.