-
Notifications
You must be signed in to change notification settings - Fork 19
/
file_event_source.go
161 lines (136 loc) · 3.94 KB
/
file_event_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Copyright (c) 2022 Gobalsky Labs Limited
//
// Use of this software is governed by the Business Source License included
// in the LICENSE.DATANODE file and at https://www.mariadb.com/bsl11.
//
// Change Date: 18 months from the later of the date of the first publicly
// available Distribution of this version of the repository, and 25 June 2022.
//
// On the date above, in accordance with the Business Source License, use
// of this software will be governed by version 3 or later of the GNU General
// Public License.
package broker
import (
"context"
"encoding/binary"
"fmt"
"io"
"os"
"time"
"code.vegaprotocol.io/vega/core/events"
eventspb "code.vegaprotocol.io/vega/protos/vega/events/v1"
"github.com/golang/protobuf/proto"
)
type fileEventSource struct {
eventsFile string
timeBetweenBlocks time.Duration
sendChannelBufferSize int
chainID string
}
//revive:disable:unexported-return
func NewFileEventSource(file string, timeBetweenBlocks time.Duration,
sendChannelBufferSize int, chainID string) (*fileEventSource, error,
) {
return &fileEventSource{
eventsFile: file,
timeBetweenBlocks: timeBetweenBlocks,
sendChannelBufferSize: sendChannelBufferSize,
chainID: chainID,
}, nil
}
func (e fileEventSource) Listen() error {
return nil
}
func (e fileEventSource) Send(events.Event) error {
return nil
}
func (e fileEventSource) Receive(ctx context.Context) (<-chan events.Event, <-chan error) {
eventsCh := make(chan events.Event, e.sendChannelBufferSize)
errorCh := make(chan error, 1)
go sendAllEvents(ctx, eventsCh, e.eventsFile, e.timeBetweenBlocks, errorCh, e.chainID)
return eventsCh, errorCh
}
func sendAllEvents(ctx context.Context, out chan<- events.Event, file string,
timeBetweenBlocks time.Duration, errorCh chan<- error, chainID string,
) {
eventFile, err := os.Open(file)
defer func() {
_ = eventFile.Close()
close(out)
close(errorCh)
}()
if err != nil {
errorCh <- err
return
}
sizeBytes := make([]byte, 4)
eventBlock := make([]*eventspb.BusEvent, 0)
var offset int64
currentBlock := ""
for {
select {
case <-ctx.Done():
return
default:
read, err := eventFile.ReadAt(sizeBytes, offset)
if err == io.EOF {
// Nothing more to read, send any pending messages. Do not immediately close our
// output channel, instead sit and wait for our context to be cancelled (e.g. by a
// shutdown), so as not to trigger a premature exit.
err = sendBlock(ctx, out, eventBlock)
if err != nil {
errorCh <- fmt.Errorf("send block failed:%w", err)
return
}
<-ctx.Done()
return
}
if err != nil {
errorCh <- fmt.Errorf("error whilst reading message size from events file:%w", err)
return
}
offset += int64(read)
msgSize := binary.BigEndian.Uint32(sizeBytes)
msgBytes := make([]byte, msgSize)
read, err = eventFile.ReadAt(msgBytes, offset)
if err != nil {
errorCh <- fmt.Errorf("error whilst reading message bytes from events file:%w", err)
return
}
offset += int64(read)
event := &eventspb.BusEvent{}
err = proto.Unmarshal(msgBytes, event)
if err != nil {
errorCh <- fmt.Errorf("failed to unmarshal bus event: %w", err)
return
}
err = checkChainID(chainID, event.ChainId)
if err != nil {
errorCh <- fmt.Errorf("check chain id failed: %w", err)
return
}
if event.Block != currentBlock {
if err := sendBlock(ctx, out, eventBlock); err != nil {
errorCh <- err
return
}
eventBlock = eventBlock[:0]
time.Sleep(timeBetweenBlocks)
currentBlock = event.Block
}
eventBlock = append(eventBlock, event)
}
}
}
func sendBlock(ctx context.Context, out chan<- events.Event, batch []*eventspb.BusEvent) error {
for _, busEvent := range batch {
evt := toEvent(ctx, busEvent)
// Listen for context cancels, even if we're blocked sending events
select {
case out <- evt:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}