-
Notifications
You must be signed in to change notification settings - Fork 7
/
client.go
201 lines (171 loc) · 7.08 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package block
import (
"context"
"cosmossdk.io/depinject"
cometclient "github.com/cosmos/cosmos-sdk/client"
"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/events"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
)
const (
// committedBlocksQuery is the query used to subscribe to new committed block
// events used by the EventsQueryClient to subscribe to new block events from
// the chain.
// See: https://docs.cosmos.network/v0.47/learn/advanced/events#default-events
committedBlocksQuery = "tm.event='NewBlock'"
// defaultBlocksReplayLimit is the number of blocks that the replay
// observable returned by LastNBlocks() will be able to replay.
// TODO_TECHDEBT/TODO_FUTURE: add a `blocksReplayLimit` field to the blockReplayClient
// struct that defaults to this but can be overridden via an option.
defaultBlocksReplayLimit = 100
)
// NewBlockClient creates a new block client from the given dependencies and
// cometWebsocketURL. It uses a pre-defined committedBlocksQuery to subscribe to
// newly committed block events which are mapped to Block objects.
//
// This lightly wraps the EventsReplayClient[Block] generic to correctly mock
// the interface.
//
// Required dependencies:
// - client.EventsQueryClient
func NewBlockClient(
ctx context.Context,
deps depinject.Config,
opts ...client.BlockClientOption,
) (_ client.BlockClient, err error) {
ctx, cancel := context.WithCancel(ctx)
// latestBlockPublishCh is the channel that notifies the latestBlockReplayObs of a
// new block, whether it comes from a direct query or an event subscription query.
latestBlockReplayObs, latestBlockPublishCh := channel.NewReplayObservable[client.Block](ctx, 10)
bClient := &blockReplayClient{
latestBlockReplayObs: latestBlockReplayObs,
close: cancel,
}
for _, opt := range opts {
opt(bClient)
}
bClient.eventsReplayClient, err = events.NewEventsReplayClient[client.Block](
ctx,
deps,
committedBlocksQuery,
UnmarshalNewBlock,
defaultBlocksReplayLimit,
events.WithConnRetryLimit[client.Block](bClient.connRetryLimit),
)
if err != nil {
cancel()
return nil, err
}
if err := depinject.Inject(deps, &bClient.onStartQueryClient); err != nil {
return nil, err
}
bClient.asyncForwardBlockEvent(ctx, latestBlockPublishCh)
if err := bClient.getInitialBlock(ctx, latestBlockPublishCh); err != nil {
return nil, err
}
return bClient, nil
}
// blockReplayClient is BlockClient implementation that combines a CometRPC client
// to get the its first block at start up and an EventsReplayClient that subscribes
// to new committed block events.
// It uses a ReplayObservable to retain and replay past observed blocks.
type blockReplayClient struct {
// onStartQueryClient is the RPC client that is used to query for the initial block
// upon blockReplayClient construction. The result of this query is only used if it
// returns before the eventsReplayClient receives its first event.
onStartQueryClient cometclient.CometRPC
// eventsReplayClient is the underlying EventsReplayClient that is used to
// subscribe to new committed block events. It uses both the Block type
// and the BlockReplayObservable type as its generic types.
// These enable the EventsReplayClient to correctly map the raw event bytes
// to Block objects and to correctly return a BlockReplayObservable
eventsReplayClient client.EventsReplayClient[client.Block]
// latestBlockReplayObs is a replay observable that combines blocks observed by
// the block query client & the events replay client. It is the "canonical"
// source of block notifications for the blockReplayClient.
latestBlockReplayObs observable.ReplayObservable[client.Block]
// close is a function that cancels the context of the blockReplayClient.
close context.CancelFunc
// connRetryLimit is the number of times the underlying replay client
// should retry in the event that it encounters an error or its connection is interrupted.
// If connRetryLimit is < 0, it will retry indefinitely.
connRetryLimit int
}
// CommittedBlocksSequence returns a replay observable of new block events.
func (b *blockReplayClient) CommittedBlocksSequence(ctx context.Context) client.BlockReplayObservable {
return b.latestBlockReplayObs
}
// LastBlock returns the last blocks observed by the blockReplayClient.
func (b *blockReplayClient) LastBlock(ctx context.Context) (block client.Block) {
// ReplayObservable#Last() is guaranteed to return at least one element since
// it fetches the latest block using the onStartQueryClient if no blocks have
// been received yet from the eventsReplayClient.
return b.latestBlockReplayObs.Last(ctx, 1)[0]
}
// Close closes the underlying websocket connection for the EventsQueryClient
// and closes all downstream connections.
func (b *blockReplayClient) Close() {
b.eventsReplayClient.Close()
b.close()
}
// asyncForwardBlockEvent asynchronously observes block event notifications from the
// EventsReplayClient's EventsSequence observable & publishes each to latestBlockPublishCh.
func (b *blockReplayClient) asyncForwardBlockEvent(
ctx context.Context,
latestBlockPublishCh chan<- client.Block,
) {
channel.ForEach(ctx, b.eventsReplayClient.EventsSequence(ctx),
func(ctx context.Context, block client.Block) {
latestBlockPublishCh <- block
},
)
}
// getInitialBlock fetches the latest committed on-chain block at the time the
// client starts up, while concurrently waiting for the next block event,
// publishing whichever occurs first to latestBlockPublishCh.
// This is necessary to ensure that the most recent block is available to the
// blockReplayClient when it is first created.
func (b *blockReplayClient) getInitialBlock(
ctx context.Context,
latestBlockPublishCh chan<- client.Block,
) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Query the latest block asynchronously.
blockQueryResultCh := make(chan client.Block)
queryErrCh := b.queryLatestBlock(ctx, blockQueryResultCh)
// Wait for either the latest block query response, error, or the first block
// event to arrive & use whichever occurs first or return an error.
var initialBlock client.Block
select {
case initialBlock = <-blockQueryResultCh:
case <-b.latestBlockReplayObs.Subscribe(ctx).Ch():
return nil
case err := <-queryErrCh:
return err
}
// At this point blockQueryResultCh was the first to receive the first block.
// Publish the initialBlock to the latestBlockPublishCh.
latestBlockPublishCh <- initialBlock
return nil
}
// queryLatestBlock uses comet RPC block client to asynchronously query for
// the latest block. It returns an error channel which may be sent a block query error.
// It is *NOT* intended to be called in a goroutine.
func (b *blockReplayClient) queryLatestBlock(
ctx context.Context,
blockQueryResultCh chan<- client.Block,
) <-chan error {
errCh := make(chan error)
go func() {
queryBlockResult, err := b.onStartQueryClient.Block(ctx, nil)
if err != nil {
errCh <- err
return
}
blockResult := cometBlockResult(*queryBlockResult)
blockQueryResultCh <- &blockResult
}()
return errCh
}