-
Notifications
You must be signed in to change notification settings - Fork 38
/
handlers.go
63 lines (50 loc) · 1.62 KB
/
handlers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package container
import (
"crypto/sha256"
"github.com/mr-tron/base58"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
"go.uber.org/zap"
)
func (cp *Processor) handlePut(ev event.Event) {
put := ev.(putEvent)
id := sha256.Sum256(put.Container())
cp.log.Info("notification",
zap.String("type", "container put"),
zap.String("id", base58.Encode(id[:])))
// send event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerPut(put) })
if err != nil {
// there system can be moved into controlled degradation stage
cp.log.Warn("container processor worker pool drained",
zap.Int("capacity", cp.pool.Cap()))
}
}
func (cp *Processor) handleDelete(ev event.Event) {
del := ev.(containerEvent.Delete)
cp.log.Info("notification",
zap.String("type", "container delete"),
zap.String("id", base58.Encode(del.ContainerID())))
// send event to the worker pool
err := cp.pool.Submit(func() { cp.processContainerDelete(&del) })
if err != nil {
// there system can be moved into controlled degradation stage
cp.log.Warn("container processor worker pool drained",
zap.Int("capacity", cp.pool.Cap()))
}
}
func (cp *Processor) handleSetEACL(ev event.Event) {
e := ev.(containerEvent.SetEACL)
cp.log.Info("notification",
zap.String("type", "set EACL"),
)
// send event to the worker pool
err := cp.pool.Submit(func() {
cp.processSetEACL(e)
})
if err != nil {
// there system can be moved into controlled degradation stage
cp.log.Warn("container processor worker pool drained",
zap.Int("capacity", cp.pool.Cap()))
}
}