-
Notifications
You must be signed in to change notification settings - Fork 699
/
service.go
180 lines (150 loc) · 3.58 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package cluster
import (
"encoding/json"
"fmt"
"log"
"net"
"os"
"sync"
"time"
)
const (
connectionTimeout = 10 * time.Second
)
var respOKMarshalled []byte
func init() {
var err error
respOKMarshalled, err = json.Marshal(response{})
if err != nil {
panic(fmt.Sprintf("unable to JSON marshal OK response: %s", err.Error()))
}
}
type response struct {
Code int `json:"code,omitempty"`
Message string `json:"message,omitempty"`
}
// Transport is the interface the network service must provide.
type Transport interface {
net.Listener
// Dial is used to create a new outgoing connection
Dial(address string, timeout time.Duration) (net.Conn, error)
}
// Store represents a store of information, managed via consensus.
type Store interface {
// Leader returns the leader of the consensus system.
Leader() string
// UpdateAPIPeers updates the API peers on the store.
UpdateAPIPeers(peers map[string]string) error
}
// Service allows access to the cluster and associated meta data,
// via consensus.
type Service struct {
tn Transport
store Store
addr net.Addr
wg sync.WaitGroup
logger *log.Logger
}
// NewService returns a new instance of the cluster service
func NewService(tn Transport, store Store) *Service {
return &Service{
tn: tn,
store: store,
addr: tn.Addr(),
logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags),
}
}
// Open opens the Service.
func (s *Service) Open() error {
s.wg.Add(1)
go s.serve()
s.logger.Println("service listening on", s.tn.Addr())
return nil
}
// Close closes the service.
func (s *Service) Close() error {
s.tn.Close()
s.wg.Wait()
return nil
}
// Addr returns the address the service is listening on.
func (s *Service) Addr() string {
return s.addr.String()
}
// SetPeer will set the mapping between raftAddr and apiAddr for the entire cluster.
func (s *Service) SetPeer(raftAddr, apiAddr string) error {
peer := map[string]string{
raftAddr: apiAddr,
}
// Try the local store. It might be the leader.
err := s.store.UpdateAPIPeers(peer)
if err == nil {
// All done! Aren't we lucky?
return nil
}
// Try talking to the leader over the network.
if leader := s.store.Leader(); leader == "" {
return fmt.Errorf("no leader available")
}
conn, err := s.tn.Dial(s.store.Leader(), connectionTimeout)
if err != nil {
return err
}
defer conn.Close()
b, err := json.Marshal(peer)
if err != nil {
return err
}
if _, err := conn.Write(b); err != nil {
return err
}
// Wait for the response and verify the operation went through.
resp := response{}
d := json.NewDecoder(conn)
err = d.Decode(&resp)
if err != nil {
return err
}
if resp.Code != 0 {
return fmt.Errorf(resp.Message)
}
return nil
}
func (s *Service) serve() error {
defer s.wg.Done()
for {
conn, err := s.tn.Accept()
if err != nil {
return err
}
go s.handleConn(conn)
}
}
func (s *Service) handleConn(conn net.Conn) {
s.logger.Printf("received connection from %s", conn.RemoteAddr().String())
// Only handles peers updates for now.
peers := make(map[string]string)
d := json.NewDecoder(conn)
err := d.Decode(&peers)
if err != nil {
return
}
// Update the peers.
if err := s.store.UpdateAPIPeers(peers); err != nil {
resp := response{1, err.Error()}
b, err := json.Marshal(resp)
if err != nil {
conn.Close() // Only way left to signal.
} else {
if _, err := conn.Write(b); err != nil {
conn.Close() // Only way left to signal.
}
}
return
}
// Let the remote node know everything went OK.
if _, err := conn.Write(respOKMarshalled); err != nil {
conn.Close() // Only way left to signal.
}
return
}