-
Notifications
You must be signed in to change notification settings - Fork 38
/
policer.go
242 lines (196 loc) · 5.4 KB
/
policer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package policer
import (
"sync"
"time"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// NodeLoader provides application load statistics.
type nodeLoader interface {
// ObjectServiceLoad returns object service load value in [0:1] range.
ObjectServiceLoad() float64
}
type objectsInWork struct {
m sync.RWMutex
objs map[oid.Address]struct{}
}
func (oiw *objectsInWork) inWork(addr oid.Address) bool {
oiw.m.RLock()
_, ok := oiw.objs[addr]
oiw.m.RUnlock()
return ok
}
func (oiw *objectsInWork) remove(addr oid.Address) {
oiw.m.Lock()
delete(oiw.objs, addr)
oiw.m.Unlock()
}
func (oiw *objectsInWork) add(addr oid.Address) {
oiw.m.Lock()
oiw.objs[addr] = struct{}{}
oiw.m.Unlock()
}
// Policer represents the utility that verifies
// compliance with the object storage policy.
type Policer struct {
*cfg
objsInWork *objectsInWork
}
// Option is an option for Policer constructor.
type Option func(*cfg)
// RedundantCopyCallback is a callback to pass
// the redundant local copy of the object.
type RedundantCopyCallback func(oid.Address)
// Network provides information about the NeoFS network to Policer for work.
type Network interface {
// IsLocalNodeInNetmap checks whether the local node belongs to the current
// network map. If it is impossible to check this fact, IsLocalNodeInNetmap
// returns false.
IsLocalNodeInNetmap() bool
}
type cfg struct {
sync.RWMutex
// available for runtime reconfiguration
headTimeout time.Duration
repCooldown time.Duration
batchSize uint32
maxCapacity uint32
log *zap.Logger
jobQueue jobQueue
cnrSrc container.Source
placementBuilder placement.Builder
remoteHeader *headsvc.RemoteHeader
netmapKeys netmap.AnnouncedKeys
replicator *replicator.Replicator
cbRedundantCopy RedundantCopyCallback
taskPool *ants.Pool
loader nodeLoader
rebalanceFreq time.Duration
network Network
}
func defaultCfg() *cfg {
return &cfg{
log: zap.L(),
batchSize: 10,
rebalanceFreq: 1 * time.Second,
repCooldown: 1 * time.Second,
}
}
// New creates, initializes and returns Policer instance.
func New(opts ...Option) *Policer {
c := defaultCfg()
for i := range opts {
opts[i](c)
}
c.log = c.log.With(zap.String("component", "Object Policer"))
return &Policer{
cfg: c,
objsInWork: &objectsInWork{
objs: make(map[oid.Address]struct{}, c.maxCapacity),
},
}
}
// WithHeadTimeout returns option to set Head timeout of Policer.
func WithHeadTimeout(v time.Duration) Option {
return func(c *cfg) {
c.headTimeout = v
}
}
// WithLogger returns option to set Logger of Policer.
func WithLogger(v *zap.Logger) Option {
return func(c *cfg) {
c.log = v
}
}
// WithLocalStorage returns option to set local object storage of Policer.
func WithLocalStorage(v *engine.StorageEngine) Option {
return func(c *cfg) {
c.jobQueue.localStorage = v
}
}
// WithContainerSource returns option to set container source of Policer.
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
// WithPlacementBuilder returns option to set object placement builder of Policer.
func WithPlacementBuilder(v placement.Builder) Option {
return func(c *cfg) {
c.placementBuilder = v
}
}
// WithRemoteHeader returns option to set object header receiver of Policer.
func WithRemoteHeader(v *headsvc.RemoteHeader) Option {
return func(c *cfg) {
c.remoteHeader = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) {
c.netmapKeys = v
}
}
// WithReplicator returns option to set object replicator of Policer.
func WithReplicator(v *replicator.Replicator) Option {
return func(c *cfg) {
c.replicator = v
}
}
// WithRedundantCopyCallback returns option to set
// callback to pass redundant local object copies
// detected by Policer.
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
return func(c *cfg) {
c.cbRedundantCopy = cb
}
}
// WithMaxCapacity returns option to set max capacity
// that can be set to the pool.
func WithMaxCapacity(capacity uint32) Option {
return func(c *cfg) {
c.maxCapacity = capacity
}
}
// WithPool returns option to set pool for
// policy and replication operations.
func WithPool(p *ants.Pool) Option {
return func(c *cfg) {
c.taskPool = p
}
}
// WithNodeLoader returns option to set NeoFS node load source.
func WithNodeLoader(l nodeLoader) Option {
return func(c *cfg) {
c.loader = l
}
}
// WithNetwork provides Network component.
func WithNetwork(n Network) Option {
return func(c *cfg) {
c.network = n
}
}
// WithReplicationCooldown returns option to set replication
// cooldown: the [Policer] will not submit more than one task
// per a provided time duration.
func WithReplicationCooldown(d time.Duration) Option {
return func(c *cfg) {
c.repCooldown = d
}
}
// WithObjectBatchSize returns option to set maximum objects
// read from the Storage at once.
func WithObjectBatchSize(s uint32) Option {
return func(c *cfg) {
c.batchSize = s
}
}