-
Notifications
You must be signed in to change notification settings - Fork 178
/
event_emitter.go
279 lines (228 loc) · 6.93 KB
/
event_emitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
package environment
import (
"fmt"
"github.com/onflow/cadence"
jsoncdc "github.com/onflow/cadence/encoding/json"
"github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/fvm/state"
"github.com/onflow/flow-go/fvm/systemcontracts"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/trace"
)
const (
DefaultEventCollectionByteSizeLimit = 256_000 // 256KB
)
type EventEmitterParams struct {
ServiceEventCollectionEnabled bool
EventCollectionByteSizeLimit uint64
}
func DefaultEventEmitterParams() EventEmitterParams {
return EventEmitterParams{
ServiceEventCollectionEnabled: false,
EventCollectionByteSizeLimit: DefaultEventCollectionByteSizeLimit,
}
}
// EventEmitter collect events, separates out service events, and enforces
// event size limits.
//
// Note that scripts do not emit events, but must expose the API in compliance
// with the runtime environment interface.
type EventEmitter interface {
// Cadence's runtime API. Note that the script variant will return
// OperationNotSupportedError.
EmitEvent(event cadence.Event) error
Events() []flow.Event
ServiceEvents() []flow.Event
Reset()
}
type ParseRestrictedEventEmitter struct {
txnState *state.TransactionState
impl EventEmitter
}
func NewParseRestrictedEventEmitter(
txnState *state.TransactionState,
impl EventEmitter,
) EventEmitter {
return ParseRestrictedEventEmitter{
txnState: txnState,
impl: impl,
}
}
func (emitter ParseRestrictedEventEmitter) EmitEvent(event cadence.Event) error {
return parseRestrict1Arg(
emitter.txnState,
"EmitEvent",
emitter.impl.EmitEvent,
event)
}
func (emitter ParseRestrictedEventEmitter) Events() []flow.Event {
return emitter.impl.Events()
}
func (emitter ParseRestrictedEventEmitter) ServiceEvents() []flow.Event {
return emitter.impl.ServiceEvents()
}
func (emitter ParseRestrictedEventEmitter) Reset() {
emitter.impl.Reset()
}
var _ EventEmitter = NoEventEmitter{}
// NoEventEmitter is usually used in the environment for script execution,
// where emitting an event does nothing.
type NoEventEmitter struct{}
func (NoEventEmitter) EmitEvent(event cadence.Event) error {
return nil
}
func (NoEventEmitter) Events() []flow.Event {
return []flow.Event{}
}
func (NoEventEmitter) ServiceEvents() []flow.Event {
return []flow.Event{}
}
func (NoEventEmitter) Reset() {
}
type eventEmitter struct {
tracer *Tracer
meter Meter
chain flow.Chain
txID flow.Identifier
txIndex uint32
payer flow.Address
EventEmitterParams
eventCollection *EventCollection
}
// NewEventEmitter constructs a new eventEmitter
func NewEventEmitter(
tracer *Tracer,
meter Meter,
chain flow.Chain,
txInfo TransactionInfoParams,
params EventEmitterParams,
) EventEmitter {
emitter := &eventEmitter{
tracer: tracer,
meter: meter,
chain: chain,
txID: txInfo.TxId,
txIndex: txInfo.TxIndex,
payer: txInfo.TxBody.Payer,
EventEmitterParams: params,
}
emitter.Reset()
return emitter
}
func (emitter *eventEmitter) Reset() {
// TODO: for now we are not resetting meter here because we don't check meter
// metrics after the first metering failure and when limit is disabled.
emitter.eventCollection = NewEventCollection(emitter.meter)
}
func (emitter *eventEmitter) EventCollection() *EventCollection {
return emitter.eventCollection
}
func (emitter *eventEmitter) EmitEvent(event cadence.Event) error {
defer emitter.tracer.StartExtensiveTracingSpanFromRoot(
trace.FVMEnvEmitEvent).End()
err := emitter.meter.MeterComputation(ComputationKindEmitEvent, 1)
if err != nil {
return fmt.Errorf("emit event failed: %w", err)
}
payload, err := jsoncdc.Encode(event)
if err != nil {
return errors.NewEncodingFailuref(
err,
"failed to json encode a cadence event")
}
payloadSize := uint64(len(payload))
flowEvent := flow.Event{
Type: flow.EventType(event.EventType.ID()),
TransactionID: emitter.txID,
TransactionIndex: emitter.txIndex,
EventIndex: emitter.eventCollection.TotalEventCounter(),
Payload: payload,
}
// TODO: to set limit to maximum when it is service account and get rid of this flag
isServiceAccount := emitter.payer == emitter.chain.ServiceAddress()
if emitter.ServiceEventCollectionEnabled {
ok, err := IsServiceEvent(event, emitter.chain.ChainID())
if err != nil {
return fmt.Errorf("unable to check service event: %w", err)
}
if ok {
eventEmitError := emitter.eventCollection.AppendServiceEvent(flowEvent, payloadSize)
// skip limit if payer is service account
if !isServiceAccount && eventEmitError != nil {
return eventEmitError
}
}
// We don't return and append the service event into event collection
// as well.
}
eventEmitError := emitter.eventCollection.AppendEvent(flowEvent, payloadSize)
// skip limit if payer is service account
if !isServiceAccount {
return eventEmitError
}
return nil
}
func (emitter *eventEmitter) Events() []flow.Event {
return emitter.eventCollection.events
}
func (emitter *eventEmitter) ServiceEvents() []flow.Event {
return emitter.eventCollection.serviceEvents
}
type EventCollection struct {
events flow.EventsList
serviceEvents flow.EventsList
eventCounter uint32
meter Meter
}
func NewEventCollection(meter Meter) *EventCollection {
return &EventCollection{
events: make([]flow.Event, 0, 10),
serviceEvents: make([]flow.Event, 0, 10),
eventCounter: uint32(0),
meter: meter,
}
}
func (collection *EventCollection) Events() []flow.Event {
return collection.events
}
func (collection *EventCollection) AppendEvent(event flow.Event, size uint64) error {
collection.events = append(collection.events, event)
collection.eventCounter++
return collection.meter.MeterEmittedEvent(size)
}
func (collection *EventCollection) ServiceEvents() []flow.Event {
return collection.serviceEvents
}
func (collection *EventCollection) AppendServiceEvent(
event flow.Event,
size uint64,
) error {
collection.serviceEvents = append(collection.serviceEvents, event)
collection.eventCounter++
return collection.meter.MeterEmittedEvent(size)
}
func (collection *EventCollection) TotalByteSize() uint64 {
return collection.meter.TotalEmittedEventBytes()
}
func (collection *EventCollection) TotalEventCounter() uint32 {
return collection.eventCounter
}
// IsServiceEvent determines whether or not an emitted Cadence event is
// considered a service event for the given chain.
func IsServiceEvent(event cadence.Event, chain flow.ChainID) (bool, error) {
// retrieve the service event information for this chain
events, err := systemcontracts.ServiceEventsForChain(chain)
if err != nil {
return false, fmt.Errorf(
"unknown system contracts for chain (%s): %w",
chain.String(),
err)
}
eventType := flow.EventType(event.EventType.ID())
for _, serviceEvent := range events.All() {
if serviceEvent.EventType() == eventType {
return true, nil
}
}
return false, nil
}