Skip to content

Commit

Permalink
Merge pull request #2410 from influxdb/increase_election_timeout
Browse files Browse the repository at this point in the history
Allow Raft timeouts to be configured
  • Loading branch information
otoolep committed Apr 23, 2015
2 parents e9f5eaa + 86db6df commit a37752c
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

### Features
- [#2398](https://github.com/influxdb/influxdb/pull/2398) Track more stats and report errors for shards.
- [#2410](https://github.com/influxdb/influxdb/pull/2410) Allow Raft time configuration

### Bugfixes
- [#2370](https://github.com/influxdb/influxdb/pull/2370): Fix data race in openTSDB endpoint.
Expand Down
33 changes: 32 additions & 1 deletion cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ const (
// DefaultMaxTopicSize is the default maximum size in bytes a segment can consume on disk of a broker.
DefaultBrokerMaxSegmentSize = 10 * 1024 * 1024

// DefaultRaftApplyInterval is the period between applying commited Raft log entries.
DefaultRaftApplyInterval = 10 * time.Millisecond

// DefaultRaftElectionTimeout is the default Leader Election timeout.
DefaultRaftElectionTimeout = 1 * time.Second

// DefaultRaftHeartbeatInterval is the interval between leader heartbeats.
DefaultRaftHeartbeatInterval = 100 * time.Millisecond

// DefaultRaftReconnectTimeout is the default wait time between reconnections.
DefaultRaftReconnectTimeout = 10 * time.Millisecond

// DefaultGraphiteDatabaseName is the default Graphite database if none is specified
DefaultGraphiteDatabaseName = "graphite"

Expand Down Expand Up @@ -108,12 +120,19 @@ var DefaultSnapshotURL = url.URL{
type Broker struct {
Dir string `toml:"dir"`
Enabled bool `toml:"enabled"`
Timeout Duration `toml:"election-timeout"`
TruncationInterval Duration `toml:"truncation-interval"`
MaxTopicSize int64 `toml:"max-topic-size"`
MaxSegmentSize int64 `toml:"max-segment-size"`
}

// Raft represents the Raft configuration for broker nodes.
type Raft struct {
ApplyInterval Duration `toml:"apply-interval"`
ElectionTimeout Duration `toml:"election-timeout"`
HeartbeatInterval Duration `toml:"heartbeat-interval"`
ReconnectTimeout Duration `toml:"reconnect-timeout"`
}

// Snapshot represents the configuration for a snapshot service. Snapshot configuration
// is only valid for data nodes.
type Snapshot struct {
Expand Down Expand Up @@ -178,6 +197,8 @@ type Config struct {

Broker Broker `toml:"broker"`

Raft Raft `toml:"raft"`

Data Data `toml:"data"`

Snapshot Snapshot `toml:"snapshot"`
Expand Down Expand Up @@ -260,6 +281,11 @@ func NewConfig() *Config {
c.Broker.MaxTopicSize = DefaultBrokerMaxTopicSize
c.Broker.MaxSegmentSize = DefaultBrokerMaxSegmentSize

c.Raft.ApplyInterval = Duration(DefaultRaftApplyInterval)
c.Raft.ElectionTimeout = Duration(DefaultRaftElectionTimeout)
c.Raft.HeartbeatInterval = Duration(DefaultRaftHeartbeatInterval)
c.Raft.ReconnectTimeout = Duration(DefaultRaftReconnectTimeout)

// FIX(benbjohnson): Append where the udp servers are actually used.
// config.UDPServers = append(config.UDPServers, UDPInputConfig{
// Enabled: tomlConfiguration.InputPlugins.UDPInput.Enabled,
Expand All @@ -284,6 +310,11 @@ func NewTestConfig() (*Config, error) {
c.Broker.Enabled = true
c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker")

c.Raft.ApplyInterval = Duration(DefaultRaftApplyInterval)
c.Raft.ElectionTimeout = Duration(DefaultRaftElectionTimeout)
c.Raft.HeartbeatInterval = Duration(DefaultRaftHeartbeatInterval)
c.Raft.ReconnectTimeout = Duration(DefaultRaftReconnectTimeout)

c.Data.Enabled = true
c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data")

Expand Down
9 changes: 8 additions & 1 deletion cmd/influxd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ enabled = false
# Where the broker logs are stored. The user running InfluxDB will need read/write access.
dir = "/tmp/influxdb/development/broker"
# election-timeout = "2s"
# Raft distributed consensus
[raft]
apply-interval = "10ms"
election-timeout = "1s"
[data]
dir = "/tmp/influxdb/development/db"
Expand Down Expand Up @@ -252,6 +255,10 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("broker disabled mismatch: %v, got: %v", false, c.Broker.Enabled)
}

if c.Raft.ApplyInterval != main.Duration(10*time.Millisecond) {
t.Fatalf("Raft apply interval mismatch: %v, got %v", 10*time.Millisecond, c.Raft.ApplyInterval)
}

if c.Data.Dir != "/tmp/influxdb/development/db" {
t.Fatalf("data dir mismatch: %v", c.Data.Dir)
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,14 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL, h *Handler) {
b.Log = l
cmd.node.raftLog = l

// Create Raft clock.
clk := raft.NewClock()
clk.ApplyInterval = time.Duration(cmd.config.Raft.ApplyInterval)
clk.ElectionTimeout = time.Duration(cmd.config.Raft.ElectionTimeout)
clk.HeartbeatInterval = time.Duration(cmd.config.Raft.HeartbeatInterval)
clk.ReconnectTimeout = time.Duration(cmd.config.Raft.ReconnectTimeout)
l.Clock = clk

// Open broker so it can feed last index data to the log.
if err := b.Open(path); err != nil {
log.Fatalf("failed to open broker at %s : %s", path, err)
Expand Down
7 changes: 7 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ truncation-interval = "10m"
max-topic-size = 1073741824
max-segment-size = 10485760

# Raft configuration. Controls the distributed consensus system.
[raft]
apply-interval = "10ms"
election-timeout = "1s"
heartbeat-interval = "100ms"
reconnect-timeout = "10ms"

# Data node configuration. Data nodes are where the time-series data, in the form of
# shards, is stored.
[data]
Expand Down

0 comments on commit a37752c

Please sign in to comment.