-
Notifications
You must be signed in to change notification settings - Fork 0
/
slave.go
100 lines (79 loc) · 2.23 KB
/
slave.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package dkvs
import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
)
func (n *Node) checkMasterHealth() error {
return errorNotImplemented
}
func (n *Node) electNewLeader() (*Node, error) {
return nil, errorNotImplemented
}
func (n *Node) promoteToMaster() error {
return errorNotImplemented
}
// ReceiveListUpdate applies a nodes list update sent from the master
func (n *Node) ReceiveListUpdate(nodes map[string]*Node) error {
n.nMutex.Lock()
defer n.nMutex.Unlock()
n.nodes = nodes
n.MasterID = nodes[n.ID].MasterID
return nil
}
// ReceiveWrite applies a write sent from the master
func (n *Node) ReceiveWrite(key, val string) error {
if n.IsMaster() {
return errorNotSlave
}
if err := n.storage.Set(key, val); err != nil {
return err
}
log.Printf("node %s replicated key %s", n.ID, key)
return nil
}
// ReplicateFromMaster will read a stream of data from the master and save it
// locally to this slave
// next steps:
// 1/ until the initial replication is complete, slave shouldn't accept reads: it
// will either return an error or redirect the read query to another node.
// 2/ slaves will still accept writes but store them in an ordered queue. It
// will apply all the writes in sequential order (first in, first out) once the
// replication is done
func (n *Node) ReplicateFromMaster(r io.Reader) error {
err := n.storage.ReplicateFrom(r)
if err != nil {
log.Println("shutting the node down because replication failed: ", err)
defer n.Close()
}
log.Printf("node %s replicated the database", n.ID)
return err
}
// NewSlave creates a new node that joins an existing master
func NewSlave(addr, master string) (*Node, error) {
n, err := newNode(addr)
if err != nil {
defer n.Close()
return nil, fmt.Errorf("creating node: %v", err)
}
url := "http://" + master + "/join"
payload, _ := json.Marshal(n)
buffer := bytes.NewBuffer(payload)
resp, err := http.Post(url, encoding, buffer)
if err != nil {
defer n.Close()
return nil, fmt.Errorf("joining master: %v", err)
}
defer resp.Body.Close()
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
body := buf.String()
if resp.StatusCode != 200 {
defer n.Close()
return nil, fmt.Errorf("joining master bad response: %v", body)
}
return n, err
}