/
evacuate.go
189 lines (156 loc) · 4.74 KB
/
evacuate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package engine
import (
"errors"
"fmt"
"github.com/nspcc-dev/hrw/v2"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
"github.com/nspcc-dev/neofs-node/pkg/util"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
shardID []*shard.ID
handler func(oid.Address, *objectSDK.Object) error
ignoreErrors bool
}
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
count int
}
// WithShardIDList sets shard ID.
func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) {
p.shardID = id
}
// WithIgnoreErrors sets flag to ignore errors.
func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
}
// WithFaultHandler sets handler to call for objects which cannot be saved on other shards.
func (p *EvacuateShardPrm) WithFaultHandler(f func(oid.Address, *objectSDK.Object) error) {
p.handler = f
}
// Count returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p EvacuateShardRes) Count() int {
return p.count
}
const defaultEvacuateBatchSize = 100
type pooledShard struct {
hashedShard
pool util.WorkerPool
}
var errMustHaveTwoShards = errors.New("must have at least 1 spare shard")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) {
sidList := make([]string, len(prm.shardID))
for i := range prm.shardID {
sidList[i] = prm.shardID[i].String()
}
e.mtx.RLock()
for i := range sidList {
sh, ok := e.shards[sidList[i]]
if !ok {
e.mtx.RUnlock()
return EvacuateShardRes{}, errShardNotFound
}
if !sh.GetMode().ReadOnly() {
e.mtx.RUnlock()
return EvacuateShardRes{}, shard.ErrMustBeReadOnly
}
}
if len(e.shards)-len(sidList) < 1 && prm.handler == nil {
e.mtx.RUnlock()
return EvacuateShardRes{}, errMustHaveTwoShards
}
e.log.Info("started shards evacuation", zap.Strings("shard_ids", sidList))
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put.
shards := make([]pooledShard, 0, len(e.shards))
for id := range e.shards {
shards = append(shards, pooledShard{
hashedShard: hashedShard(e.shards[id]),
pool: e.shardPools[id],
})
}
e.mtx.RUnlock()
shardMap := make(map[string]*shard.Shard)
for i := range sidList {
for j := range shards {
if shards[j].ID().String() == sidList[i] {
shardMap[sidList[i]] = shards[j].Shard
}
}
}
var listPrm shard.ListWithCursorPrm
listPrm.WithCount(defaultEvacuateBatchSize)
var res EvacuateShardRes
mainLoop:
for n := range sidList {
sh := shardMap[sidList[n]]
var c *meta.Cursor
for {
listPrm.WithCursor(c)
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.ListWithCursor(listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) {
continue mainLoop
}
return res, err
}
// TODO (@fyrchik): #1731 parallelize the loop
lst := listRes.AddressList()
loop:
for i := range lst {
addr := lst[i].Address
addrHash := hrw.WrapBytes([]byte(addr.EncodeToString()))
var getPrm shard.GetPrm
getPrm.SetAddress(addr)
getRes, err := sh.Get(getPrm)
if err != nil {
if prm.ignoreErrors {
continue
}
return res, err
}
hrw.Sort(shards, addrHash)
for j := range shards {
if _, ok := shardMap[shards[j].ID().String()]; ok {
continue
}
putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, PutPrm{obj: getRes.Object()})
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
zap.String("from", sidList[n]),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", addr))
res.count++
}
continue loop
}
}
if prm.handler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return res, fmt.Errorf("%w: %s", errPutShard, lst[i])
}
err = prm.handler(addr, getRes.Object())
if err != nil {
return res, err
}
res.count++
}
c = listRes.Cursor()
}
}
e.log.Info("finished shards evacuation",
zap.Strings("shard_ids", sidList))
return res, nil
}