forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
node.go
134 lines (115 loc) · 2.57 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package influxdb
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
)
const (
nodeFile = "node.json"
oldNodeFile = "id"
peersFilename = "peers.json"
)
type Node struct {
path string
ID uint64
MetaServers []string
}
// LoadNode will load the node information from disk if present
func LoadNode(path string, addrs []string) (*Node, error) {
// Always check to see if we are upgrading first
if err := upgradeNodeFile(path, addrs); err != nil {
return nil, err
}
n := &Node{
path: path,
}
f, err := os.Open(filepath.Join(path, nodeFile))
if err != nil {
return nil, err
}
defer f.Close()
if err := json.NewDecoder(f).Decode(n); err != nil {
return nil, err
}
return n, nil
}
// NewNode will return a new node
func NewNode(path string, addrs []string) *Node {
return &Node{
path: path,
MetaServers: addrs,
}
}
// Save will save the node file to disk and replace the existing one if present
func (n *Node) Save() error {
file := filepath.Join(n.path, nodeFile)
tmpFile := file + "tmp"
f, err := os.Create(tmpFile)
if err != nil {
return err
}
defer f.Close()
if err := json.NewEncoder(f).Encode(n); err != nil {
return err
}
return os.Rename(tmpFile, file)
}
// AddMetaServers adds the addrs to the set of MetaServers known to this node.
// If an addr already exists, it will not be re-added.
func (n *Node) AddMetaServers(addrs []string) {
unique := map[string]struct{}{}
for _, addr := range n.MetaServers {
unique[addr] = struct{}{}
}
for _, addr := range addrs {
unique[addr] = struct{}{}
}
metaServers := []string{}
for addr := range unique {
metaServers = append(metaServers, addr)
}
n.MetaServers = metaServers
}
func upgradeNodeFile(path string, addrs []string) error {
oldFile := filepath.Join(path, oldNodeFile)
b, err := ioutil.ReadFile(oldFile)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
// We shouldn't have an empty ID file, but if we do, ignore it
if len(b) == 0 {
return nil
}
peers := []string{}
pb, err := ioutil.ReadFile(filepath.Join(path, peersFilename))
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
err = json.Unmarshal(pb, &peers)
if len(peers) > 1 {
return fmt.Errorf("to upgrade a cluster, please contact support at influxdata")
}
n := &Node{
path: path,
MetaServers: addrs,
}
if n.ID, err = strconv.ParseUint(string(b), 10, 64); err != nil {
return err
}
if err := n.Save(); err != nil {
return err
}
if err := os.Remove(oldFile); err != nil {
return err
}
return nil
}