/
sqlitechainclient.go
161 lines (144 loc) · 4.59 KB
/
sqlitechainclient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package sqlitechainclient
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"math/big"
"sync"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/rs/zerolog"
logger "github.com/rs/zerolog/log"
"github.com/textileio/go-tableland/internal/tableland"
)
// SQLiteChainClient is an eventfeed.ChainClient backed by a persisted history of EVM events in a SQLite database.
// An EventFeed can be plugged to this backend instead of Alchemy/Infura.
type SQLiteChainClient struct {
log zerolog.Logger
db *sql.DB
chainID tableland.ChainID
onceChainTip sync.Once
chainTipBlockNumber int64
}
// New returns a new *SQLiteChainClient.
func New(dbURI string, chainID tableland.ChainID) (*SQLiteChainClient, error) {
log := logger.With().
Str("component", "sqlitechainclient").
Int64("chain_id", int64(chainID)).
Logger()
db, err := sql.Open("sqlite3", dbURI)
if err != nil {
return nil, fmt.Errorf("opening db: %s", err)
}
return &SQLiteChainClient{
log: log,
db: db,
chainID: chainID,
}, nil
}
// FilterLogs returns the logs matching a particular filter.
func (scc *SQLiteChainClient) FilterLogs(ctx context.Context, filter ethereum.FilterQuery) ([]types.Log, error) {
if len(filter.Addresses) != 1 {
return nil, fmt.Errorf("the query filter must have a single contract address filter")
}
if filter.BlockHash != nil {
return nil, fmt.Errorf("block_hash filter isn't supported")
}
query := `select address, topics, data, block_number, tx_hash, tx_index, block_hash, event_index
from system_evm_events
where chain_id=?1 and
block_number between ?2 and ?3 and
address=?4
order by block_number asc, event_index asc`
rows, err := scc.db.QueryContext(
ctx,
query,
scc.chainID, filter.FromBlock.Int64(), filter.ToBlock.Int64(), filter.Addresses[0].Hex())
if err != nil {
return nil, fmt.Errorf("get filters in range: %s", err)
}
defer func() {
if err := rows.Close(); err != nil {
scc.log.Error().Err(err).Msg("closing query")
}
}()
var logs []types.Log
for rows.Next() {
if rows.Err() != nil {
return nil, fmt.Errorf("get row: %s", rows.Err())
}
var address, txHash, blockHash string
var data, topicsJSON []byte
var blockNumber uint64
var txIndex, eventIndex uint
if err := rows.Scan(
&address,
&topicsJSON,
&data,
&blockNumber,
&txHash,
&txIndex,
&blockHash,
&eventIndex); err != nil {
return nil, fmt.Errorf("scan row: %s", err)
}
var topicsHex []string
if err := json.Unmarshal(topicsJSON, &topicsHex); err != nil {
return nil, fmt.Errorf("unmarshal json topics: %s", err)
}
topics := make([]common.Hash, len(topicsHex))
for i, topicHex := range topicsHex {
topics[i] = common.HexToHash(topicHex)
}
logs = append(logs, types.Log{
Address: common.HexToAddress(address),
Topics: topics,
Data: data,
BlockNumber: blockNumber,
TxHash: common.HexToHash(txHash),
TxIndex: txIndex,
BlockHash: common.HexToHash(blockHash),
Index: eventIndex,
})
}
return logs, nil
}
// HeaderByNumber returns the block header of the chain.
// Note that it can only be called with block==nil (i.e: last known block) since the underlying SQLite database isn't
// a full replication of the underlying chain.
func (scc *SQLiteChainClient) HeaderByNumber(ctx context.Context, block *big.Int) (*types.Header, error) {
if block != nil {
return nil, errors.New("the current implementation only allows returning the latest block number")
}
scc.onceChainTip.Do(func() {
blockNumber, err := scc.getChainTipBlockNumber(ctx)
if err != nil {
scc.log.Error().Err(err).Msg("loading chain tip block number")
scc.chainTipBlockNumber = -1
scc.onceChainTip = sync.Once{} // Reset to retry in the next `HeaderByNumber(...)` call
return
}
scc.chainTipBlockNumber = blockNumber
})
if scc.chainTipBlockNumber == -1 {
return nil, fmt.Errorf("chain tip block number couldn't be loaded")
}
return &types.Header{
Number: big.NewInt(scc.chainTipBlockNumber),
}, nil
}
func (scc *SQLiteChainClient) getChainTipBlockNumber(ctx context.Context) (int64, error) {
query := "select block_number from system_evm_events where chain_id=?1 order by block_number desc limit 1"
row := scc.db.QueryRowContext(ctx, query, scc.chainID)
if row.Err() == sql.ErrNoRows {
return 0, errors.New("no blocks found")
}
var blockNumber int64
if err := row.Scan(&blockNumber); err != nil {
return 0, fmt.Errorf("reading block_number column: %s", err)
}
return blockNumber, nil
}