forked from canonical/lxd
/
migration_volumes.go
268 lines (227 loc) · 8.53 KB
/
migration_volumes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package migration
import (
"fmt"
"io"
"github.com/lxc/lxd/lxd/operations"
"github.com/lxc/lxd/shared"
"github.com/lxc/lxd/shared/ioprogress"
"github.com/lxc/lxd/shared/units"
)
// Type represents the migration transport type. It indicates the method by which the migration can
// take place and what optional features are available.
type Type struct {
FSType MigrationFSType // Transport mode selected.
Features []string // Feature hints for selected FSType transport mode.
}
// VolumeSourceArgs represents the arguments needed to setup a volume migration source.
type VolumeSourceArgs struct {
Name string
Snapshots []string
MigrationType Type
TrackProgress bool
MultiSync bool
FinalSync bool
Data interface{} // Optional store to persist storage driver state between MultiSync phases.
ContentType string
}
// VolumeTargetArgs represents the arguments needed to setup a volume migration sink.
type VolumeTargetArgs struct {
Name string
Description string
Config map[string]string
Snapshots []string
MigrationType Type
TrackProgress bool
Refresh bool
Live bool
VolumeSize int64
ContentType string
}
// TypesToHeader converts one or more Types to a MigrationHeader. It uses the first type argument
// supplied to indicate the preferred migration method and sets the MigrationHeader's Fs type
// to that. If the preferred type is ZFS then it will also set the header's optional ZfsFeatures.
// If the fallback Rsync type is present in any of the types even if it is not preferred, then its
// optional features are added to the header's RsyncFeatures, allowing for fallback negotiation to
// take place on the farside.
func TypesToHeader(types ...Type) *MigrationHeader {
missingFeature := false
hasFeature := true
var preferredType Type
if len(types) > 0 {
preferredType = types[0]
}
header := MigrationHeader{Fs: &preferredType.FSType}
// Add ZFS features if preferred type is ZFS.
if preferredType.FSType == MigrationFSType_ZFS {
features := ZfsFeatures{
Compress: &missingFeature,
}
for _, feature := range preferredType.Features {
if feature == "compress" {
features.Compress = &hasFeature
}
}
header.ZfsFeatures = &features
}
// Add BTRFS features if preferred type is BTRFS.
if preferredType.FSType == MigrationFSType_BTRFS {
features := BtrfsFeatures{
MigrationHeader: &missingFeature,
HeaderSubvolumes: &missingFeature,
}
for _, feature := range preferredType.Features {
if feature == BTRFSFeatureMigrationHeader {
features.MigrationHeader = &hasFeature
} else if feature == BTRFSFeatureSubvolumes {
features.HeaderSubvolumes = &hasFeature
}
}
header.BtrfsFeatures = &features
}
// Check all the types for an Rsync method, if found add its features to the header's RsyncFeatures list.
for _, t := range types {
if t.FSType != MigrationFSType_RSYNC && t.FSType != MigrationFSType_BLOCK_AND_RSYNC {
continue
}
features := RsyncFeatures{
Xattrs: &missingFeature,
Delete: &missingFeature,
Compress: &missingFeature,
Bidirectional: &missingFeature,
}
for _, feature := range t.Features {
if feature == "xattrs" {
features.Xattrs = &hasFeature
} else if feature == "delete" {
features.Delete = &hasFeature
} else if feature == "compress" {
features.Compress = &hasFeature
} else if feature == "bidirectional" {
features.Bidirectional = &hasFeature
}
}
header.RsyncFeatures = &features
break // Only use the first rsync transport type found to generate rsync features list.
}
return &header
}
// MatchTypes attempts to find matching migration transport types between an offered type sent from a remote
// source and the types supported by a local storage pool. If matches are found then one or more Types are
// returned containing the method and the matching optional features present in both. The function also takes a
// fallback type which is used as an additional offer type preference in case the preferred remote type is not
// compatible with the local type available. It is expected that both sides of the migration will support the
// fallback type for the volume's content type that is being migrated.
func MatchTypes(offer *MigrationHeader, fallbackType MigrationFSType, ourTypes []Type) ([]Type, error) {
// Generate an offer types slice from the preferred type supplied from remote and the
// fallback type supplied based on the content type of the transfer.
offeredFSTypes := []MigrationFSType{offer.GetFs(), fallbackType}
matchedTypes := []Type{}
// Find first matching type.
for _, ourType := range ourTypes {
for _, offerFSType := range offeredFSTypes {
if offerFSType != ourType.FSType {
continue // Not a match, try the next one.
}
// We got a match, now extract the relevant offered features.
var offeredFeatures []string
if offerFSType == MigrationFSType_ZFS {
offeredFeatures = offer.GetZfsFeaturesSlice()
} else if offerFSType == MigrationFSType_BTRFS {
offeredFeatures = offer.GetBtrfsFeaturesSlice()
} else if offerFSType == MigrationFSType_RSYNC {
offeredFeatures = offer.GetRsyncFeaturesSlice()
if !shared.StringInSlice("bidirectional", offeredFeatures) {
// If no bi-directional support, this means we are getting a response from
// an old LXD server that doesn't support bidirectional negotiation, so
// assume LXD 3.7 level. NOTE: Do NOT extend this list of arguments.
offeredFeatures = []string{"xattrs", "delete", "compress"}
}
}
// Find common features in both our type and offered type.
commonFeatures := []string{}
for _, ourFeature := range ourType.Features {
if shared.StringInSlice(ourFeature, offeredFeatures) {
commonFeatures = append(commonFeatures, ourFeature)
}
}
// Append type with combined features.
matchedTypes = append(matchedTypes, Type{
FSType: ourType.FSType,
Features: commonFeatures,
})
}
}
if len(matchedTypes) < 1 {
// No matching transport type found, generate an error with offered types and our types.
offeredTypeStrings := make([]string, 0, len(offeredFSTypes))
for _, offerFSType := range offeredFSTypes {
offeredTypeStrings = append(offeredTypeStrings, offerFSType.String())
}
ourTypeStrings := make([]string, 0, len(ourTypes))
for _, ourType := range ourTypes {
ourTypeStrings = append(ourTypeStrings, ourType.FSType.String())
}
return matchedTypes, fmt.Errorf("No matching migration types found. Offered types: %v, our types: %v", offeredTypeStrings, ourTypeStrings)
}
return matchedTypes, nil
}
func progressWrapperRender(op *operations.Operation, key string, description string, progressInt int64, speedInt int64) {
meta := op.Metadata()
if meta == nil {
meta = make(map[string]interface{})
}
progress := fmt.Sprintf("%s (%s/s)", units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
if description != "" {
progress = fmt.Sprintf("%s: %s (%s/s)", description, units.GetByteSizeString(progressInt, 2), units.GetByteSizeString(speedInt, 2))
}
if meta[key] != progress {
meta[key] = progress
op.UpdateMetadata(meta)
}
}
// ProgressReader reports the read progress.
func ProgressReader(op *operations.Operation, key string, description string) func(io.ReadCloser) io.ReadCloser {
return func(reader io.ReadCloser) io.ReadCloser {
if op == nil {
return reader
}
progress := func(progressInt int64, speedInt int64) {
progressWrapperRender(op, key, description, progressInt, speedInt)
}
readPipe := &ioprogress.ProgressReader{
ReadCloser: reader,
Tracker: &ioprogress.ProgressTracker{
Handler: progress,
},
}
return readPipe
}
}
// ProgressWriter reports the write progress.
func ProgressWriter(op *operations.Operation, key string, description string) func(io.WriteCloser) io.WriteCloser {
return func(writer io.WriteCloser) io.WriteCloser {
if op == nil {
return writer
}
progress := func(progressInt int64, speedInt int64) {
progressWrapperRender(op, key, description, progressInt, speedInt)
}
writePipe := &ioprogress.ProgressWriter{
WriteCloser: writer,
Tracker: &ioprogress.ProgressTracker{
Handler: progress,
},
}
return writePipe
}
}
// ProgressTracker returns a migration I/O tracker
func ProgressTracker(op *operations.Operation, key string, description string) *ioprogress.ProgressTracker {
progress := func(progressInt int64, speedInt int64) {
progressWrapperRender(op, key, description, progressInt, speedInt)
}
tracker := &ioprogress.ProgressTracker{
Handler: progress,
}
return tracker
}