Skip to content

Commit

Permalink
[IMPROVED] Performance after Raft log snapshot
Browse files Browse the repository at this point in the history
When Raft deletes a lot of entries in the raft log (backed by BoltDB),
the performance of subsequent writes would suffer.
Added some logic to possibly recreate new log with remaining entries
if that number is within some arbitrary limits.
Added logging to report compaction duration, and if that takes more
than 2 seconds, warn the user that the TrailingLogs value should
probably be lowered.

Resolves #520
  • Loading branch information
kozlovic committed Apr 3, 2018
1 parent e4cfd16 commit b8ff40d
Show file tree
Hide file tree
Showing 10 changed files with 725 additions and 692 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
@@ -1,8 +1,8 @@
language: go
sudo: false
go:
- 1.8.x
- 1.9.x
- 1.10.x
install:
- go get -t ./...
- go get github.com/nats-io/gnatsd
Expand Down
42 changes: 26 additions & 16 deletions server/clustering.go
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/nats-io/go-nats"
"github.com/nats-io/nats-streaming-server/spb"
)
Expand Down Expand Up @@ -64,7 +63,7 @@ type raftNode struct {
sync.Mutex
closed bool
*raft.Raft
store *raftboltdb.BoltStore
store *raftLog
transport *raft.NetworkTransport
logInput io.WriteCloser
joinSub *nats.Subscription
Expand Down Expand Up @@ -93,19 +92,32 @@ func (r *raftNode) shutdown() error {
}
r.closed = true
r.Unlock()
if err := r.Raft.Shutdown().Error(); err != nil {
return err
if r.Raft != nil {
if err := r.Raft.Shutdown().Error(); err != nil {
return err
}
}
if err := r.transport.Close(); err != nil {
return err
if r.transport != nil {
if err := r.transport.Close(); err != nil {
return err
}
}
if err := r.store.Close(); err != nil {
return err
if r.store != nil {
if err := r.store.Close(); err != nil {
return err
}
}
if err := r.joinSub.Unsubscribe(); err != nil {
return err
if r.joinSub != nil {
if err := r.joinSub.Unsubscribe(); err != nil {
return err
}
}
if r.logInput != nil {
if err := r.logInput.Close(); err != nil {
return err
}
}
return r.logInput.Close()
return nil
}

// createRaftNode creates and starts a new Raft node.
Expand Down Expand Up @@ -251,10 +263,8 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
// So we want the object to exist so we can check on leader atomic, etc..
s.raft = &raftNode{}

store, err := raftboltdb.New(raftboltdb.Options{
Path: filepath.Join(path, raftLogFile),
NoSync: !s.opts.Clustering.Sync,
})
raftLogFileName := filepath.Join(path, raftLogFile)
store, err := newRaftLog(s.log, raftLogFileName, s.opts.Clustering.Sync, int(s.opts.Clustering.TrailingLogs))
if err != nil {
return false, err
}
Expand Down Expand Up @@ -441,7 +451,7 @@ func (r *raftFSM) Apply(l *raft.Log) interface{} {
}
if err != nil {
panic(fmt.Errorf("failed to store replicated message %d on channel %s: %v",
msg.Sequence, c.name, err))
msg.Sequence, msg.Subject, err))
}
}
return nil
Expand Down

0 comments on commit b8ff40d

Please sign in to comment.