/
walwrap.go
255 lines (225 loc) · 7.26 KB
/
walwrap.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
package storage
import (
"context"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
"github.com/coreos/etcd/wal/walpb"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/encryption"
"github.com/pkg/errors"
)
// This package wraps the github.com/coreos/etcd/wal package, and encrypts
// the bytes of whatever entry is passed to it, and decrypts the bytes of
// whatever entry it reads.
// WAL is the interface presented by github.com/coreos/etcd/wal.WAL that we depend upon
type WAL interface {
ReadAll() ([]byte, raftpb.HardState, []raftpb.Entry, error)
ReleaseLockTo(index uint64) error
Close() error
Save(st raftpb.HardState, ents []raftpb.Entry) error
SaveSnapshot(e walpb.Snapshot) error
}
// WALFactory provides an interface for the different ways to get a WAL object.
// For instance, the etcd/wal package itself provides this
type WALFactory interface {
Create(dirpath string, metadata []byte) (WAL, error)
Open(dirpath string, walsnap walpb.Snapshot) (WAL, error)
}
var _ WAL = &wrappedWAL{}
var _ WAL = &wal.WAL{}
var _ WALFactory = walCryptor{}
// wrappedWAL wraps a github.com/coreos/etcd/wal.WAL, and handles encrypting/decrypting
type wrappedWAL struct {
*wal.WAL
encrypter encryption.Encrypter
decrypter encryption.Decrypter
}
// ReadAll wraps the wal.WAL.ReadAll() function, but it first checks to see if the
// metadata indicates that the entries are encryptd, and if so, decrypts them.
func (w *wrappedWAL) ReadAll() ([]byte, raftpb.HardState, []raftpb.Entry, error) {
metadata, state, ents, err := w.WAL.ReadAll()
if err != nil {
return metadata, state, ents, err
}
for i, ent := range ents {
ents[i].Data, err = encryption.Decrypt(ent.Data, w.decrypter)
if err != nil {
return nil, raftpb.HardState{}, nil, err
}
}
return metadata, state, ents, nil
}
// Save encrypts the entry data (if an encrypter is exists) before passing it onto the
// wrapped wal.WAL's Save function.
func (w *wrappedWAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
var writeEnts []raftpb.Entry
for _, ent := range ents {
data, err := encryption.Encrypt(ent.Data, w.encrypter)
if err != nil {
return err
}
writeEnts = append(writeEnts, raftpb.Entry{
Index: ent.Index,
Term: ent.Term,
Type: ent.Type,
Data: data,
})
}
return w.WAL.Save(st, writeEnts)
}
// walCryptor is an object that provides the same functions as `etcd/wal`
// and `etcd/snap` that we need to open a WAL object or Snapshotter object
type walCryptor struct {
encrypter encryption.Encrypter
decrypter encryption.Decrypter
}
// NewWALFactory returns an object that can be used to produce objects that
// will read from and write to encrypted WALs on disk.
func NewWALFactory(encrypter encryption.Encrypter, decrypter encryption.Decrypter) WALFactory {
return walCryptor{
encrypter: encrypter,
decrypter: decrypter,
}
}
// Create returns a new WAL object with the given encrypters and decrypters.
func (wc walCryptor) Create(dirpath string, metadata []byte) (WAL, error) {
w, err := wal.Create(dirpath, metadata)
if err != nil {
return nil, err
}
return &wrappedWAL{
WAL: w,
encrypter: wc.encrypter,
decrypter: wc.decrypter,
}, nil
}
// Open returns a new WAL object with the given encrypters and decrypters.
func (wc walCryptor) Open(dirpath string, snap walpb.Snapshot) (WAL, error) {
w, err := wal.Open(dirpath, snap)
if err != nil {
return nil, err
}
return &wrappedWAL{
WAL: w,
encrypter: wc.encrypter,
decrypter: wc.decrypter,
}, nil
}
type originalWAL struct{}
func (o originalWAL) Create(dirpath string, metadata []byte) (WAL, error) {
return wal.Create(dirpath, metadata)
}
func (o originalWAL) Open(dirpath string, walsnap walpb.Snapshot) (WAL, error) {
return wal.Open(dirpath, walsnap)
}
// OriginalWAL is the original `wal` package as an implementation of the WALFactory interface
var OriginalWAL WALFactory = originalWAL{}
// WALData contains all the data returned by a WAL's ReadAll() function
// (metadata, hardwate, and entries)
type WALData struct {
Metadata []byte
HardState raftpb.HardState
Entries []raftpb.Entry
}
// ReadRepairWAL opens a WAL for reading, and attempts to read it. If we can't read it, attempts to repair
// and read again.
func ReadRepairWAL(
ctx context.Context,
walDir string,
walsnap walpb.Snapshot,
factory WALFactory,
) (WAL, WALData, error) {
var (
reader WAL
metadata []byte
st raftpb.HardState
ents []raftpb.Entry
err error
)
repaired := false
for {
if reader, err = factory.Open(walDir, walsnap); err != nil {
return nil, WALData{}, errors.Wrap(err, "failed to open WAL")
}
if metadata, st, ents, err = reader.ReadAll(); err != nil {
if closeErr := reader.Close(); closeErr != nil {
return nil, WALData{}, closeErr
}
if _, ok := err.(encryption.ErrCannotDecrypt); ok {
return nil, WALData{}, errors.Wrap(err, "failed to decrypt WAL")
}
// we can only repair ErrUnexpectedEOF and we never repair twice.
if repaired || err != io.ErrUnexpectedEOF {
return nil, WALData{}, errors.Wrap(err, "irreparable WAL error")
}
if !wal.Repair(walDir) {
return nil, WALData{}, errors.Wrap(err, "WAL error cannot be repaired")
}
log.G(ctx).WithError(err).Info("repaired WAL error")
repaired = true
continue
}
break
}
return reader, WALData{
Metadata: metadata,
HardState: st,
Entries: ents,
}, nil
}
// MigrateWALs reads existing WALs (from a particular snapshot and beyond) from one directory, encoded one way,
// and writes them to a new directory, encoded a different way
func MigrateWALs(ctx context.Context, oldDir, newDir string, oldFactory, newFactory WALFactory, snapshot walpb.Snapshot) error {
oldReader, waldata, err := ReadRepairWAL(ctx, oldDir, snapshot, oldFactory)
if err != nil {
return err
}
oldReader.Close()
if err := os.MkdirAll(filepath.Dir(newDir), 0700); err != nil {
return errors.Wrap(err, "could not create parent directory")
}
// keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := filepath.Clean(newDir) + ".tmp"
if err := os.RemoveAll(tmpdirpath); err != nil {
return errors.Wrap(err, "could not remove temporary WAL directory")
}
defer os.RemoveAll(tmpdirpath)
tmpWAL, err := newFactory.Create(tmpdirpath, waldata.Metadata)
if err != nil {
return errors.Wrap(err, "could not create new WAL in temporary WAL directory")
}
defer tmpWAL.Close()
if err := tmpWAL.SaveSnapshot(snapshot); err != nil {
return errors.Wrap(err, "could not write WAL snapshot in temporary directory")
}
if err := tmpWAL.Save(waldata.HardState, waldata.Entries); err != nil {
return errors.Wrap(err, "could not migrate WALs to temporary directory")
}
if err := tmpWAL.Close(); err != nil {
return err
}
return os.Rename(tmpdirpath, newDir)
}
// ListWALs lists all the wals in a directory and returns the list in lexical
// order (oldest first)
func ListWALs(dirpath string) ([]string, error) {
dirents, err := ioutil.ReadDir(dirpath)
if err != nil {
return nil, err
}
var wals []string
for _, dirent := range dirents {
if strings.HasSuffix(dirent.Name(), ".wal") {
wals = append(wals, dirent.Name())
}
}
// Sort WAL filenames in lexical order
sort.Sort(sort.StringSlice(wals))
return wals, nil
}