/
events.go
291 lines (278 loc) · 8.78 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package core
import (
"context"
"fmt"
"github.com/pkg/errors"
tmpubsub "github.com/oneiro-ndev/tendermint.0.32.3/libs/pubsub"
tmquery "github.com/oneiro-ndev/tendermint.0.32.3/libs/pubsub/query"
ctypes "github.com/oneiro-ndev/tendermint.0.32.3/rpc/core/types"
rpctypes "github.com/oneiro-ndev/tendermint.0.32.3/rpc/lib/types"
)
// Subscribe for events via WebSocket.
//
// To tell which events you want, you need to provide a query. query is a
// string, which has a form: "condition AND condition ..." (no OR at the
// moment). condition has a form: "key operation operand". key is a string with
// a restricted set of possible symbols ( \t\n\r\\()"'=>< are not allowed).
// operation can be "=", "<", "<=", ">", ">=", "CONTAINS". operand can be a
// string (escaped with single quotes), number, date or time.
//
// Examples:
// tm.event = 'NewBlock' # new blocks
// tm.event = 'CompleteProposal' # node got a complete proposal
// tm.event = 'Tx' AND tx.hash = 'XYZ' # single transaction
// tm.event = 'Tx' AND tx.height = 5 # all txs of the fifth block
// tx.height = 5 # all txs of the fifth block
//
// Tendermint provides a few predefined keys: tm.event, tx.hash and tx.height.
// Note for transactions, you can define additional keys by providing events with
// DeliverTx response.
//
// import (
// abci "github.com/oneiro-ndev/tendermint.0.32.3/abci/types"
// "github.com/oneiro-ndev/tendermint.0.32.3/libs/pubsub/query"
// )
//
// abci.ResponseDeliverTx{
// Events: []abci.Event{
// {
// Type: "rewards.withdraw",
// Attributes: cmn.KVPairs{
// cmn.KVPair{Key: []byte("address"), Value: []byte("AddrA")},
// cmn.KVPair{Key: []byte("source"), Value: []byte("SrcX")},
// cmn.KVPair{Key: []byte("amount"), Value: []byte("...")},
// cmn.KVPair{Key: []byte("balance"), Value: []byte("...")},
// },
// },
// {
// Type: "rewards.withdraw",
// Attributes: cmn.KVPairs{
// cmn.KVPair{Key: []byte("address"), Value: []byte("AddrB")},
// cmn.KVPair{Key: []byte("source"), Value: []byte("SrcY")},
// cmn.KVPair{Key: []byte("amount"), Value: []byte("...")},
// cmn.KVPair{Key: []byte("balance"), Value: []byte("...")},
// },
// },
// {
// Type: "transfer",
// Attributes: cmn.KVPairs{
// cmn.KVPair{Key: []byte("sender"), Value: []byte("AddrC")},
// cmn.KVPair{Key: []byte("recipient"), Value: []byte("AddrD")},
// cmn.KVPair{Key: []byte("amount"), Value: []byte("...")},
// },
// },
// },
// }
//
// All events are indexed by a composite key of the form {eventType}.{evenAttrKey}.
// In the above examples, the following keys would be indexed:
// - rewards.withdraw.address
// - rewards.withdraw.source
// - rewards.withdraw.amount
// - rewards.withdraw.balance
// - transfer.sender
// - transfer.recipient
// - transfer.amount
//
// Multiple event types with duplicate keys are allowed and are meant to
// categorize unique and distinct events. In the above example, all events
// indexed under the key `rewards.withdraw.address` will have the following
// values stored and queryable:
//
// - AddrA
// - AddrB
//
// To create a query for txs where address AddrA withdrew rewards:
// query.MustParse("tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA'")
//
// To create a query for txs where address AddrA withdrew rewards from source Y:
// query.MustParse("tm.event = 'Tx' AND rewards.withdraw.address = 'AddrA' AND rewards.withdraw.source = 'Y'")
//
// To create a query for txs where AddrA transferred funds:
// query.MustParse("tm.event = 'Tx' AND transfer.sender = 'AddrA'")
//
// The following queries would return no results:
// query.MustParse("tm.event = 'Tx' AND transfer.sender = 'AddrZ'")
// query.MustParse("tm.event = 'Tx' AND rewards.withdraw.address = 'AddrZ'")
// query.MustParse("tm.event = 'Tx' AND rewards.withdraw.source = 'W'")
//
// See list of all possible events here
// https://godoc.org/github.com/oneiro-ndev/tendermint.0.32.3/types#pkg-constants
//
// For complete query syntax, check out
// https://godoc.org/github.com/oneiro-ndev/tendermint.0.32.3/libs/pubsub/query.
//
// ```go
// import "github.com/oneiro-ndev/tendermint.0.32.3/types"
//
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
// err := client.Start()
// if err != nil {
// // handle error
// }
// defer client.Stop()
// ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
// defer cancel()
// query := "tm.event = 'Tx' AND tx.height = 3"
// txs, err := client.Subscribe(ctx, "test-client", query)
// if err != nil {
// // handle error
// }
//
// go func() {
// for e := range txs {
// fmt.Println("got ", e.Data.(types.EventDataTx))
// }
// }()
// ```
//
// > The above command returns JSON structured like this:
//
// ```json
// {
// "error": "",
// "result": {},
// "id": "",
// "jsonrpc": "2.0"
// }
// ```
//
// ### Query Parameters
//
// | Parameter | Type | Default | Required | Description |
// |-----------+--------+---------+----------+-------------|
// | query | string | "" | true | Query |
//
// <aside class="notice">WebSocket only</aside>
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
addr := ctx.RemoteAddr()
if eventBus.NumClients() >= config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients)
} else if eventBus.NumClientSubscriptions(addr) >= config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient)
}
logger.Info("Subscribe to query", "remote", addr, "query", query)
q, err := tmquery.New(query)
if err != nil {
return nil, errors.Wrap(err, "failed to parse query")
}
subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout)
defer cancel()
sub, err := eventBus.Subscribe(subCtx, addr, q)
if err != nil {
return nil, err
}
go func() {
for {
select {
case msg := <-sub.Out():
resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Events: msg.Events()}
ctx.WSConn.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
ctx.WSConn.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
resultEvent,
))
case <-sub.Cancelled():
if sub.Err() != tmpubsub.ErrUnsubscribed {
var reason string
if sub.Err() == nil {
reason = "Tendermint exited"
} else {
reason = sub.Err().Error()
}
ctx.WSConn.TryWriteRPCResponse(
rpctypes.RPCServerError(rpctypes.JSONRPCStringID(
fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
fmt.Errorf("subscription was cancelled (reason: %s)", reason),
))
}
return
}
}
}()
return &ctypes.ResultSubscribe{}, nil
}
// Unsubscribe from events via WebSocket.
//
// ```go
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
// err := client.Start()
// if err != nil {
// // handle error
// }
// defer client.Stop()
// query := "tm.event = 'Tx' AND tx.height = 3"
// err = client.Unsubscribe(context.Background(), "test-client", query)
// if err != nil {
// // handle error
// }
// ```
//
// > The above command returns JSON structured like this:
//
// ```json
// {
// "error": "",
// "result": {},
// "id": "",
// "jsonrpc": "2.0"
// }
// ```
//
// ### Query Parameters
//
// | Parameter | Type | Default | Required | Description |
// |-----------+--------+---------+----------+-------------|
// | query | string | "" | true | Query |
//
// <aside class="notice">WebSocket only</aside>
func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
logger.Info("Unsubscribe from query", "remote", addr, "query", query)
q, err := tmquery.New(query)
if err != nil {
return nil, errors.Wrap(err, "failed to parse query")
}
err = eventBus.Unsubscribe(context.Background(), addr, q)
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}
// Unsubscribe from all events via WebSocket.
//
// ```go
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
// err := client.Start()
// if err != nil {
// // handle error
// }
// defer client.Stop()
// err = client.UnsubscribeAll(context.Background(), "test-client")
// if err != nil {
// // handle error
// }
// ```
//
// > The above command returns JSON structured like this:
//
// ```json
// {
// "error": "",
// "result": {},
// "id": "",
// "jsonrpc": "2.0"
// }
// ```
//
// <aside class="notice">WebSocket only</aside>
func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
logger.Info("Unsubscribe from all", "remote", addr)
err := eventBus.UnsubscribeAll(context.Background(), addr)
if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil
}