-
-
Notifications
You must be signed in to change notification settings - Fork 182
/
recover.go
247 lines (207 loc) · 5.82 KB
/
recover.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package cluster
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
dqlite "github.com/canonical/go-dqlite"
client "github.com/canonical/go-dqlite/client"
"github.com/lxc/lxd/lxd/db"
"github.com/lxc/lxd/lxd/node"
)
// ListDatabaseNodes returns a list of database node names.
func ListDatabaseNodes(database *db.Node) ([]string, error) {
nodes := []db.RaftNode{}
err := database.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
var err error
nodes, err = tx.GetRaftNodes(ctx)
return err
})
if err != nil {
return nil, fmt.Errorf("Failed to list database nodes: %w", err)
}
addresses := make([]string, 0)
for _, node := range nodes {
if node.Role != db.RaftVoter {
continue
}
addresses = append(addresses, node.Address)
}
return addresses, nil
}
// Recover attempts data recovery on the cluster database.
func Recover(database *db.Node) error {
// Figure out if we actually act as dqlite node.
var info *db.RaftNode
err := database.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
var err error
info, err = node.DetermineRaftNode(ctx, tx)
return err
})
if err != nil {
return fmt.Errorf("Failed to determine node role: %w", err)
}
// If we're not a database node, return an error.
if info == nil {
return fmt.Errorf("This LXD instance has no database role")
}
// If this is a standalone node not exposed to the network, return an
// error.
if info.Address == "" {
return fmt.Errorf("This LXD instance is not clustered")
}
dir := filepath.Join(database.Dir(), "global")
server, err := dqlite.New(
uint64(info.ID),
info.Address,
dir,
)
if err != nil {
return fmt.Errorf("Failed to create dqlite server: %w", err)
}
cluster := []dqlite.NodeInfo{
{ID: uint64(info.ID), Address: info.Address},
}
err = server.Recover(cluster)
if err != nil {
return fmt.Errorf("Failed to recover database state: %w", err)
}
// Update the list of raft nodes.
err = database.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
nodes := []db.RaftNode{
{
NodeInfo: client.NodeInfo{
ID: info.ID,
Address: info.Address,
},
Name: info.Name,
},
}
return tx.ReplaceRaftNodes(nodes)
})
if err != nil {
return fmt.Errorf("Failed to update database nodes: %w", err)
}
return nil
}
// updateLocalAddress updates the cluster.https_address for this node.
func updateLocalAddress(database *db.Node, address string) error {
err := database.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
var err error
config, err := node.ConfigLoad(ctx, tx)
if err != nil {
return err
}
newConfig := map[string]any{"cluster.https_address": address}
_, err = config.Patch(newConfig)
if err != nil {
return err
}
return nil
})
if err != nil {
return fmt.Errorf("Failed to update node configuration: %w", err)
}
return nil
}
// Reconfigure replaces the entire cluster configuration.
// Addresses and node roles may be updated. Node IDs are read-only.
func Reconfigure(database *db.Node, raftNodes []db.RaftNode) error {
var info *db.RaftNode
err := database.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
var err error
info, err = node.DetermineRaftNode(ctx, tx)
return err
})
if err != nil {
return fmt.Errorf("Failed to determine cluster member raft role: %w", err)
}
if info == nil {
return fmt.Errorf("This cluster member has no raft role")
}
localAddress := info.Address
nodes := make([]client.NodeInfo, 0, len(raftNodes))
for _, raftNode := range raftNodes {
nodes = append(nodes, raftNode.NodeInfo)
// Get the new address for this node.
if raftNode.ID == info.ID {
localAddress = raftNode.Address
}
}
// Update cluster.https_address if changed.
if localAddress != info.Address {
err := updateLocalAddress(database, localAddress)
if err != nil {
return err
}
}
dir := filepath.Join(database.Dir(), "global")
// Replace cluster configuration in dqlite.
err = dqlite.ReconfigureMembershipExt(dir, nodes)
if err != nil {
return fmt.Errorf("Failed to recover database state: %w", err)
}
// Replace cluster configuration in local raft_nodes database.
err = database.Transaction(context.TODO(), func(ctx context.Context, tx *db.NodeTx) error {
return tx.ReplaceRaftNodes(raftNodes)
})
if err != nil {
return err
}
// Create patch file for global nodes database.
content := ""
for _, node := range nodes {
content += fmt.Sprintf("UPDATE nodes SET address = %q WHERE id = %d;\n", node.Address, node.ID)
}
if len(content) > 0 {
filePath := filepath.Join(database.Dir(), "patch.global.sql")
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer func() { _ = file.Close() }()
_, err = file.Write([]byte(content))
if err != nil {
return err
}
err = file.Close()
if err != nil {
return err
}
}
return nil
}
// RemoveRaftNode removes a raft node from the raft configuration.
func RemoveRaftNode(gateway *Gateway, address string) error {
nodes, err := gateway.currentRaftNodes()
if err != nil {
return fmt.Errorf("Failed to get current raft nodes: %w", err)
}
var id uint64
for _, node := range nodes {
if node.Address == address {
id = node.ID
break
}
}
if id == 0 {
return fmt.Errorf("No raft node with address %q", address)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
client, err := client.FindLeader(
ctx, gateway.NodeStore(),
client.WithDialFunc(gateway.raftDial()),
client.WithLogFunc(DqliteLog),
)
if err != nil {
return fmt.Errorf("Failed to connect to cluster leader: %w", err)
}
defer func() { _ = client.Close() }()
err = client.Remove(ctx, id)
if err != nil {
return fmt.Errorf("Failed to remove node: %w", err)
}
return nil
}