-
Notifications
You must be signed in to change notification settings - Fork 1
/
raft.go
72 lines (61 loc) · 1.39 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main
import (
"io/ioutil"
"log"
"os"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
)
var (
tickDuration = 100 * time.Millisecond
)
type raftNode struct {
raft.Node
ticker <-chan time.Time
raftStorage *raft.MemoryStorage
trans transport
goal uint64
reach chan struct{}
done chan struct{}
}
func (n *raftNode) run() {
dir, err := ioutil.TempDir("", "raft-bench")
if err != nil {
log.Fatalf("raft-bench: cannot create dir for wal (%v)", err)
}
defer os.RemoveAll(dir)
w, err := wal.Create(dir, nil)
if err != nil {
log.Fatalf("raft-bench: create wal error: %v", err)
}
defer w.Close()
for {
select {
case <-n.ticker:
n.Tick()
case rd := <-n.Node.Ready():
w.Save(rd.HardState, rd.Entries)
n.raftStorage.Append(rd.Entries)
n.trans.Send(rd.Messages)
for i := range rd.CommittedEntries {
if rd.CommittedEntries[i].Index%100 == 0 {
latency[rd.CommittedEntries[i].Index/100][1] = time.Now().UnixNano()
}
if rd.CommittedEntries[i].Index == n.goal {
n.reach <- struct{}{}
}
}
n.Node.Advance()
// batch for 0.5ms
time.Sleep(500 * time.Microsecond)
case <-n.done:
return
}
}
}
func (n *raftNode) Process(ctx context.Context, m raftpb.Message) error {
return n.Step(ctx, m)
}