/
processor.go
173 lines (143 loc) · 4.74 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package container
import (
"errors"
"fmt"
"github.com/nspcc-dev/neo-go/pkg/core/mempoolevent"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/morph/client/neofsid"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
type (
// AlphabetState is a callback interface for inner ring global state.
AlphabetState interface {
IsAlphabet() bool
}
// Processor of events produced by container contract in the sidechain.
Processor struct {
log *zap.Logger
pool *ants.Pool
alphabetState AlphabetState
cnrClient *container.Client // notary must be enabled
idClient *neofsid.Client
netState NetworkState
}
// Params of the processor constructor.
Params struct {
Log *zap.Logger
PoolSize int
AlphabetState AlphabetState
ContainerClient *container.Client
NeoFSIDClient *neofsid.Client
NetworkState NetworkState
}
)
// NetworkState is an interface of a component
// that provides access to network state.
type NetworkState interface {
// Epoch must return the number of the current epoch.
//
// Must return any error encountered
// which did not allow reading the value.
Epoch() (uint64, error)
// HomomorphicHashDisabled must return boolean that
// represents homomorphic network state:
// * true if hashing is disabled;
// * false if hashing is enabled.
//
// which did not allow reading the value.
HomomorphicHashDisabled() (bool, error)
}
// New creates a container contract processor instance.
func New(p *Params) (*Processor, error) {
switch {
case p.Log == nil:
return nil, errors.New("ir/container: logger is not set")
case p.AlphabetState == nil:
return nil, errors.New("ir/container: global state is not set")
case p.ContainerClient == nil:
return nil, errors.New("ir/container: Container client is not set")
case p.NeoFSIDClient == nil:
return nil, errors.New("ir/container: NeoFS ID client is not set")
case p.NetworkState == nil:
return nil, errors.New("ir/container: network state is not set")
}
p.Log.Debug("container worker pool", zap.Int("size", p.PoolSize))
pool, err := ants.NewPool(p.PoolSize, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("ir/container: can't create worker pool: %w", err)
}
return &Processor{
log: p.Log,
pool: pool,
alphabetState: p.AlphabetState,
cnrClient: p.ContainerClient,
idClient: p.NeoFSIDClient,
netState: p.NetworkState,
}, nil
}
// ListenerNotificationParsers for the 'event.Listener' event producer.
func (cp *Processor) ListenerNotificationParsers() []event.NotificationParserInfo {
return nil
}
// ListenerNotificationHandlers for the 'event.Listener' event producer.
func (cp *Processor) ListenerNotificationHandlers() []event.NotificationHandlerInfo {
return nil
}
// ListenerNotaryParsers for the 'event.Listener' notary event producer.
func (cp *Processor) ListenerNotaryParsers() []event.NotaryParserInfo {
var (
p event.NotaryParserInfo
pp = make([]event.NotaryParserInfo, 0, 4)
)
p.SetMempoolType(mempoolevent.TransactionAdded)
p.SetScriptHash(cp.cnrClient.ContractAddress())
// container put
p.SetRequestType(containerEvent.PutNotaryEvent)
p.SetParser(containerEvent.ParsePutNotary)
pp = append(pp, p)
// container named put
p.SetRequestType(containerEvent.PutNamedNotaryEvent)
p.SetParser(containerEvent.ParsePutNamedNotary)
pp = append(pp, p)
// container delete
p.SetRequestType(containerEvent.DeleteNotaryEvent)
p.SetParser(containerEvent.ParseDeleteNotary)
pp = append(pp, p)
// set EACL
p.SetRequestType(containerEvent.SetEACLNotaryEvent)
p.SetParser(containerEvent.ParseSetEACLNotary)
pp = append(pp, p)
return pp
}
// ListenerNotaryHandlers for the 'event.Listener' notary event producer.
func (cp *Processor) ListenerNotaryHandlers() []event.NotaryHandlerInfo {
var (
h event.NotaryHandlerInfo
hh = make([]event.NotaryHandlerInfo, 0, 4)
)
h.SetScriptHash(cp.cnrClient.ContractAddress())
h.SetMempoolType(mempoolevent.TransactionAdded)
// container put
h.SetRequestType(containerEvent.PutNotaryEvent)
h.SetHandler(cp.handlePut)
hh = append(hh, h)
// container named put (same handler)
h.SetRequestType(containerEvent.PutNamedNotaryEvent)
hh = append(hh, h)
// container delete
h.SetRequestType(containerEvent.DeleteNotaryEvent)
h.SetHandler(cp.handleDelete)
hh = append(hh, h)
// set eACL
h.SetRequestType(containerEvent.SetEACLNotaryEvent)
h.SetHandler(cp.handleSetEACL)
hh = append(hh, h)
return hh
}
// TimersHandlers for the 'Timers' event producer.
func (cp *Processor) TimersHandlers() []event.NotificationHandlerInfo {
return nil
}