-
Notifications
You must be signed in to change notification settings - Fork 178
/
backend_events.go
115 lines (95 loc) · 3.45 KB
/
backend_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package backend
import (
"context"
"fmt"
"time"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/engine"
rpcbackend "github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/access/state_stream"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
)
type EventsResponse struct {
BlockID flow.Identifier
Height uint64
Events flow.EventsList
}
type EventsBackend struct {
log zerolog.Logger
headers storage.Headers
broadcaster *engine.Broadcaster
sendTimeout time.Duration
responseLimit float64
sendBufferSize int
getExecutionData GetExecutionDataFunc
getStartHeight GetStartHeightFunc
useIndex bool
eventsIndex *rpcbackend.EventsIndex
}
func (b *EventsBackend) SubscribeEvents(ctx context.Context, startBlockID flow.Identifier, startHeight uint64, filter state_stream.EventFilter) state_stream.Subscription {
nextHeight, err := b.getStartHeight(startBlockID, startHeight)
if err != nil {
return NewFailedSubscription(err, "could not get start height")
}
sub := NewHeightBasedSubscription(b.sendBufferSize, nextHeight, b.getResponseFactory(filter))
go NewStreamer(b.log, b.broadcaster, b.sendTimeout, b.responseLimit, sub).Stream(ctx)
return sub
}
// getResponseFactory returns a function function that returns the event response for a given height.
func (b *EventsBackend) getResponseFactory(filter state_stream.EventFilter) GetDataByHeightFunc {
return func(ctx context.Context, height uint64) (response interface{}, err error) {
if b.useIndex {
response, err = b.getEventsFromStorage(height, filter)
} else {
response, err = b.getEventsFromExecutionData(ctx, height, filter)
}
if err == nil && b.log.GetLevel() == zerolog.TraceLevel {
eventsResponse := response.(*EventsResponse)
b.log.Trace().
Hex("block_id", logging.ID(eventsResponse.BlockID)).
Uint64("height", height).
Int("events", len(eventsResponse.Events)).
Msg("sending events")
}
return
}
}
// getEventsFromExecutionData returns the events for a given height extractd from the execution data.
func (b *EventsBackend) getEventsFromExecutionData(ctx context.Context, height uint64, filter state_stream.EventFilter) (*EventsResponse, error) {
executionData, err := b.getExecutionData(ctx, height)
if err != nil {
return nil, fmt.Errorf("could not get execution data for block %d: %w", height, err)
}
var events flow.EventsList
for _, chunkExecutionData := range executionData.ChunkExecutionDatas {
events = append(events, filter.Filter(chunkExecutionData.Events)...)
}
return &EventsResponse{
BlockID: executionData.BlockID,
Height: height,
Events: events,
}, nil
}
// getEventsFromStorage returns the events for a given height from the index storage.
func (b *EventsBackend) getEventsFromStorage(height uint64, filter state_stream.EventFilter) (*EventsResponse, error) {
blockID, err := b.headers.BlockIDByHeight(height)
if err != nil {
return nil, fmt.Errorf("could not get header for height %d: %w", height, err)
}
events, err := b.eventsIndex.ByBlockID(blockID, height)
if err != nil {
return nil, fmt.Errorf("could not get events for block %d: %w", height, err)
}
b.log.Trace().
Uint64("height", height).
Hex("block_id", logging.ID(blockID)).
Int("events", len(events)).
Msg("events from storage")
return &EventsResponse{
BlockID: blockID,
Height: height,
Events: filter.Filter(events),
}, nil
}