This repository has been archived by the owner on Apr 14, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
psql.go
256 lines (226 loc) · 8.27 KB
/
psql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
// Package psql implements an event sink backed by a PostgreSQL database.
package psql
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
"github.com/cosmos/gogoproto/proto"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/pubsub/query"
"github.com/cometbft/cometbft/types"
)
const (
tableBlocks = "blocks"
tableTxResults = "tx_results"
tableEvents = "events"
tableAttributes = "attributes"
driverName = "postgres"
)
// EventSink is an indexer backend providing the tx/block index services. This
// implementation stores records in a PostgreSQL database using the schema
// defined in state/indexer/sink/psql/schema.sql.
type EventSink struct {
store *sql.DB
chainID string
}
// NewEventSink constructs an event sink associated with the PostgreSQL
// database specified by connStr. Events written to the sink are attributed to
// the specified chainID.
func NewEventSink(connStr, chainID string) (*EventSink, error) {
db, err := sql.Open(driverName, connStr)
if err != nil {
return nil, err
}
return &EventSink{
store: db,
chainID: chainID,
}, nil
}
// DB returns the underlying Postgres connection used by the sink.
// This is exported to support testing.
func (es *EventSink) DB() *sql.DB { return es.store }
// runInTransaction executes query in a fresh database transaction.
// If query reports an error, the transaction is rolled back and the
// error from query is reported to the caller.
// Otherwise, the result of committing the transaction is returned.
func runInTransaction(db *sql.DB, query func(*sql.Tx) error) error {
dbtx, err := db.Begin()
if err != nil {
return err
}
if err := query(dbtx); err != nil {
_ = dbtx.Rollback() // report the initial error, not the rollback
return err
}
return dbtx.Commit()
}
// queryWithID executes the specified SQL query with the given arguments,
// expecting a single-row, single-column result containing an ID. If the query
// succeeds, the ID from the result is returned.
func queryWithID(tx *sql.Tx, query string, args ...interface{}) (uint32, error) {
var id uint32
if err := tx.QueryRow(query, args...).Scan(&id); err != nil {
return 0, err
}
return id, nil
}
// insertEvents inserts a slice of events and any indexed attributes of those
// events into the database associated with dbtx.
//
// If txID > 0, the event is attributed to the transaction with that
// ID; otherwise it is recorded as a block event.
func insertEvents(dbtx *sql.Tx, blockID, txID uint32, evts []abci.Event) error {
// Populate the transaction ID field iff one is defined (> 0).
var txIDArg interface{}
if txID > 0 {
txIDArg = txID
}
const (
insertEventQuery = `
INSERT INTO ` + tableEvents + ` (block_id, tx_id, type)
VALUES ($1, $2, $3)
RETURNING rowid;
`
insertAttributeQuery = `
INSERT INTO ` + tableAttributes + ` (event_id, key, composite_key, value)
VALUES ($1, $2, $3, $4);
`
)
// Add each event to the events table, and retrieve its row ID to use when
// adding any attributes the event provides.
for _, evt := range evts {
// Skip events with an empty type.
if evt.Type == "" {
continue
}
eid, err := queryWithID(dbtx, insertEventQuery, blockID, txIDArg, evt.Type)
if err != nil {
return err
}
// Add any attributes flagged for indexing.
for _, attr := range evt.Attributes {
if !attr.Index {
continue
}
compositeKey := evt.Type + "." + attr.Key
if _, err := dbtx.Exec(insertAttributeQuery, eid, attr.Key, compositeKey, attr.Value); err != nil {
return err
}
}
}
return nil
}
// makeIndexedEvent constructs an event from the specified composite key and
// value. If the key has the form "type.name", the event will have a single
// attribute with that name and the value; otherwise the event will have only
// a type and no attributes.
func makeIndexedEvent(compositeKey, value string) abci.Event {
i := strings.Index(compositeKey, ".")
if i < 0 {
return abci.Event{Type: compositeKey}
}
return abci.Event{Type: compositeKey[:i], Attributes: []abci.EventAttribute{
{Key: compositeKey[i+1:], Value: value, Index: true},
}}
}
// IndexBlockEvents indexes the specified block header, part of the
// indexer.EventSink interface.
func (es *EventSink) IndexBlockEvents(h types.EventDataNewBlockEvents) error {
ts := time.Now().UTC()
return runInTransaction(es.store, func(dbtx *sql.Tx) error {
// Add the block to the blocks table and report back its row ID for use
// in indexing the events for the block.
blockID, err := queryWithID(dbtx, `
INSERT INTO `+tableBlocks+` (height, chain_id, created_at)
VALUES ($1, $2, $3)
ON CONFLICT DO NOTHING
RETURNING rowid;
`, h.Height, es.chainID, ts)
if err == sql.ErrNoRows {
return nil // we already saw this block; quietly succeed
} else if err != nil {
return fmt.Errorf("indexing block header: %w", err)
}
// Insert the special block meta-event for height.
if err := insertEvents(dbtx, blockID, 0, []abci.Event{
makeIndexedEvent(types.BlockHeightKey, fmt.Sprint(h.Height)),
}); err != nil {
return fmt.Errorf("block meta-events: %w", err)
}
// Insert all the block events. Order is important here,
if err := insertEvents(dbtx, blockID, 0, h.Events); err != nil {
return fmt.Errorf("finalizeblock events: %w", err)
}
return nil
})
}
func (es *EventSink) IndexTxEvents(txrs []*abci.TxResult) error {
ts := time.Now().UTC()
for _, txr := range txrs {
// Encode the result message in protobuf wire format for indexing.
resultData, err := proto.Marshal(txr)
if err != nil {
return fmt.Errorf("marshaling tx_result: %w", err)
}
// Index the hash of the underlying transaction as a hex string.
txHash := fmt.Sprintf("%X", types.Tx(txr.Tx).Hash())
if err := runInTransaction(es.store, func(dbtx *sql.Tx) error {
// Find the block associated with this transaction. The block header
// must have been indexed prior to the transactions belonging to it.
blockID, err := queryWithID(dbtx, `
SELECT rowid FROM `+tableBlocks+` WHERE height = $1 AND chain_id = $2;
`, txr.Height, es.chainID)
if err != nil {
return fmt.Errorf("finding block ID: %w", err)
}
// Insert a record for this tx_result and capture its ID for indexing events.
txID, err := queryWithID(dbtx, `
INSERT INTO `+tableTxResults+` (block_id, index, created_at, tx_hash, tx_result)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING
RETURNING rowid;
`, blockID, txr.Index, ts, txHash, resultData)
if err == sql.ErrNoRows {
return nil // we already saw this transaction; quietly succeed
} else if err != nil {
return fmt.Errorf("indexing tx_result: %w", err)
}
// Insert the special transaction meta-events for hash and height.
if err := insertEvents(dbtx, blockID, txID, []abci.Event{
makeIndexedEvent(types.TxHashKey, txHash),
makeIndexedEvent(types.TxHeightKey, fmt.Sprint(txr.Height)),
}); err != nil {
return fmt.Errorf("indexing transaction meta-events: %w", err)
}
// Index any events packaged with the transaction.
if err := insertEvents(dbtx, blockID, txID, txr.Result.Events); err != nil {
return fmt.Errorf("indexing transaction events: %w", err)
}
return nil
}); err != nil {
return err
}
}
return nil
}
// SearchBlockEvents is not implemented by this sink, and reports an error for all queries.
func (es *EventSink) SearchBlockEvents(_ context.Context, _ *query.Query) ([]int64, error) {
return nil, errors.New("block search is not supported via the postgres event sink")
}
// SearchTxEvents is not implemented by this sink, and reports an error for all queries.
func (es *EventSink) SearchTxEvents(_ context.Context, _ *query.Query) ([]*abci.TxResult, error) {
return nil, errors.New("tx search is not supported via the postgres event sink")
}
// GetTxByHash is not implemented by this sink, and reports an error for all queries.
func (es *EventSink) GetTxByHash(_ []byte) (*abci.TxResult, error) {
return nil, errors.New("getTxByHash is not supported via the postgres event sink")
}
// HasBlock is not implemented by this sink, and reports an error for all queries.
func (es *EventSink) HasBlock(_ int64) (bool, error) {
return false, errors.New("hasBlock is not supported via the postgres event sink")
}
// Stop closes the underlying PostgreSQL database.
func (es *EventSink) Stop() error { return es.store.Close() }