forked from efficient/epaxos
/
server.go
131 lines (112 loc) · 4.32 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main
import (
"epaxos"
"flag"
"fmt"
"gpaxos"
"log"
"masterproto"
"mencius"
"net"
"net/http"
"net/rpc"
"os"
"os/signal"
"paxos"
"runtime"
"runtime/pprof"
"time"
)
var portnum *int = flag.Int("port", 7070, "Port # to listen on. Defaults to 7070")
var masterAddr *string = flag.String("maddr", "", "Master address. Defaults to localhost.")
var masterPort *int = flag.Int("mport", 7087, "Master port. Defaults to 7087.")
var myAddr *string = flag.String("addr", "", "Server address (this machine). Defaults to localhost.")
var doMencius *bool = flag.Bool("m", false, "Use Mencius as the replication protocol. Defaults to false.")
var doGpaxos *bool = flag.Bool("g", false, "Use Generalized Paxos as the replication protocol. Defaults to false.")
var doEpaxos *bool = flag.Bool("e", false, "Use EPaxos as the replication protocol. Defaults to false.")
var procs *int = flag.Int("p", 2, "GOMAXPROCS. Defaults to 2")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to file")
var thrifty = flag.Bool("thrifty",false, "Use only as many messages as strictly required for inter-replica communication.")
var exec = flag.Bool("exec", true, "Execute commands.")
var lread = flag.Bool("lread", false, "Execute locally read command.")
var dreply = flag.Bool("dreply", true, "Reply to client only after command has been executed.")
var beacon = flag.Bool("beacon", false, "Send beacons to other replicas to compare their relative speeds.")
var maxfailures = flag.Int("maxfailures", -1, "maximum number of maxfailures; default is a minority, ignored by other protocols than Paxos.")
var durable = flag.Bool("durable", false, "Log to a stable store (i.e., a file in the current dir).")
var batchWait *int = flag.Int("batchwait", 0, "Milliseconds to wait before sending a batch. If set to 0, batching is disabled. Defaults to 0.")
var transitiveConflicts *bool= flag.Bool("transitiveconf", true, "Conflict relation is transitive.")
func main() {
flag.Parse()
runtime.GOMAXPROCS(*procs)
if *doMencius && *thrifty{
log.Fatal("incompatble options -m -thrifty")
}
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt)
go catchKill(interrupt)
}
log.Printf("Server starting on port %d\n", *portnum)
replicaId, nodeList, isLeader := registerWithMaster(fmt.Sprintf("%s:%d", *masterAddr, *masterPort))
if *doEpaxos || *doMencius || *doGpaxos || *maxfailures == -1 {
*maxfailures = (len(nodeList)-1) / 2
}
log.Printf("Tolerating %d max. failures\n", *maxfailures)
if *doEpaxos {
log.Println("Starting Egalitarian Paxos replica...")
rep := epaxos.NewReplica(replicaId, nodeList, *thrifty, *exec, *lread, *dreply, *beacon, *durable, *batchWait, *transitiveConflicts, *maxfailures)
rpc.Register(rep)
} else if *doMencius {
log.Println("Starting Mencius replica...")
rep := mencius.NewReplica(replicaId, nodeList, *thrifty, *exec, *lread, *dreply, *durable, *maxfailures)
rpc.Register(rep)
} else if *doGpaxos {
log.Println("Starting Generalized Paxos replica...")
rep := gpaxos.NewReplica(replicaId, nodeList, isLeader, *thrifty, *exec, *lread, *dreply, *maxfailures)
rpc.Register(rep)
} else {
log.Println("Starting classic Paxos replica...")
rep := paxos.NewReplica(replicaId, nodeList, isLeader, *thrifty, *exec, *lread, *dreply, *durable, *batchWait, *maxfailures)
rpc.Register(rep)
}
rpc.HandleHTTP()
//listen for RPC on a different port (8070 by default)
l, err := net.Listen("tcp", fmt.Sprintf(":%d", *portnum+1000))
if err != nil {
log.Fatal("listen error:", err)
}
http.Serve(l, nil)
}
func registerWithMaster(masterAddr string) (int, []string, bool) {
args := &masterproto.RegisterArgs{*myAddr, *portnum}
var reply masterproto.RegisterReply
for done := false; !done; {
log.Printf("connecting to: %v",masterAddr)
mcli, err := rpc.DialHTTP("tcp", masterAddr)
if err == nil {
err = mcli.Call("Master.Register", args, &reply)
if err == nil && reply.Ready == true {
done = true
break
}
}
if err != nil {
log.Printf("%v",err)
}
time.Sleep(1e9)
}
return reply.ReplicaId, reply.NodeList, reply.IsLeader
}
func catchKill(interrupt chan os.Signal) {
<-interrupt
if *cpuprofile != "" {
pprof.StopCPUProfile()
}
fmt.Println("Caught signal")
os.Exit(0)
}