-
Notifications
You must be signed in to change notification settings - Fork 0
/
streamer.go
155 lines (131 loc) · 3.71 KB
/
streamer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package relay
import (
"context"
"time"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/cometbft/cometbft/types"
"github.com/warp-contracts/syncer/src/utils/config"
"github.com/warp-contracts/syncer/src/utils/monitoring"
"github.com/warp-contracts/syncer/src/utils/task"
)
// Streams new blocks from the sequencer
type Streamer struct {
*task.Task
monitor monitoring.Monitor
Output chan *types.Block
client *rpchttp.HTTP
// Control channels
pauseChan chan struct{}
resumeChan chan struct{}
}
// Maintains a persistent websocket connection to the sequencer
// Gets new blocks through the websocket
func NewStreamer(config *config.Config) (self *Streamer) {
self = new(Streamer)
self.pauseChan = make(chan struct{}, 1)
self.resumeChan = make(chan struct{}, 1)
self.Output = make(chan *types.Block, config.Relayer.SequencerQueueSize)
self.Task = task.NewTask(config, "new-block-streamer")
self.Task = self.Task.
WithOnBeforeStart(func() (err error) {
err = self.client.Start()
if err != nil {
self.Log.WithError(err).Error("Failed to start websocket connection")
}
return
}).
WithOnStop(func() {
self.Pause()
err := self.client.Stop()
if err != nil {
self.Log.WithError(err).Error("Failed to stop websocket connection")
}
}).
WithOnAfterStop(func() {
close(self.Output)
}).
WithSubtaskFunc(self.run)
return
}
func (self *Streamer) WithMonitor(v monitoring.Monitor) *Streamer {
self.monitor = v
return self
}
func (self *Streamer) WithClient(client *rpchttp.HTTP) *Streamer {
self.client = client
return self
}
func (self *Streamer) Pause() {
self.pauseChan <- struct{}{}
}
func (self *Streamer) Resume() {
self.resumeChan <- struct{}{}
}
func (self *Streamer) onResume() (out <-chan ctypes.ResultEvent, err error) {
// Subscribe with the query
// Query will be automatically used upon re-subscribing
ctx, cancel := context.WithTimeout(self.Ctx, 10*time.Second)
defer cancel()
return self.client.Subscribe(ctx, "relayer-new-block-streamer", "tm.event='NewBlock'", 1 /* queue size */)
}
func (self *Streamer) onPause() (err error) {
// Subscribe with the query
// Query will be automatically used upon re-subscribing
ctx, cancel := context.WithTimeout(self.Ctx, 10*time.Second)
defer cancel()
return self.client.UnsubscribeAll(ctx, "relayer-new-block-streamer")
}
func (self *Streamer) run() (err error) {
// Streamer starts in PAUSED state
var input <-chan ctypes.ResultEvent
for {
if input == nil {
// State: PAUSED
// Unsubscribed from events, waiting only for control messages
select {
case <-self.Ctx.Done():
return
case <-self.resumeChan:
input, err = self.onResume()
if err != nil {
return
}
}
} else {
// State: RUNNING
// Subscribed to events, waiting for new blocks and a control message to pause
select {
case <-self.Ctx.Done():
return
case <-self.pauseChan:
err = self.onPause()
input = nil
if err != nil {
return
}
// Input channel is still not nil, to receive pending data
case data, ok := <-input:
if !ok {
// Most probably unsubscribed from the websocket
self.Log.Debug("Input channel closed")
input = nil
}
// Neglect other events
event, ok := data.Data.(types.EventDataNewBlock)
if !ok {
self.Log.WithField("data", data).Error("Unexpected data type")
continue
} else {
self.Log.WithField("height", event.Block.Height).Debug("Received a block event from websocket")
}
self.monitor.GetReport().Relayer.State.SequencerBlocksStreamed.Inc()
select {
case <-self.Ctx.Done():
return
case self.Output <- event.Block:
}
}
}
}
}