This repository has been archived by the owner on Sep 28, 2021. It is now read-only.
/
transformer.go
318 lines (277 loc) · 11.9 KB
/
transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package transformer
import (
"errors"
"strings"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/config"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/converter"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/fetcher"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/repository"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/header/retriever"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/contract"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/parser"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/poller"
srep "github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/repository"
"github.com/vulcanize/vulcanizedb/pkg/contract_watcher/shared/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
)
// Requires a header synced vDB (headers) and a running eth node (or infura)
type Transformer struct {
// Database interfaces
EventRepository srep.EventRepository // Holds transformed watched event log data
HeaderRepository repository.HeaderRepository // Interface for interaction with header repositories
// Pre-processing interfaces
Parser parser.Parser // Parses events and methods out of contract abi fetched using contract address
Retriever retriever.BlockRetriever // Retrieves first block for contract
// Processing interfaces
Fetcher fetcher.Fetcher // Fetches event logs, using header hashes
Converter converter.ConverterInterface // Converts watched event logs into custom log
Poller poller.Poller // Polls methods using arguments collected from events and persists them using a method datastore
// Store contract configuration information
Config config.ContractConfig
// Store contract info as mapping to contract address
Contracts map[string]*contract.Contract
// Internally configured transformer variables
contractAddresses []string // Holds all contract addresses, for batch fetching of logs
sortedEventIds map[string][]string // Map to sort event column ids by contract, for post fetch processing and persisting of logs
sortedMethodIds map[string][]string // Map to sort method column ids by contract, for post fetch method polling
eventIds []string // Holds event column ids across all contract, for batch fetching of headers
eventFilters []common.Hash // Holds topic0 hashes across all contracts, for batch fetching of logs
Start int64 // Hold the lowest starting block and the highest ending block
}
// Order-of-operations:
// 1. Create new transformer
// 2. Load contract addresses and their parameters
// 3. Init
// 4. Execute
// Transformer takes in config for blockchain, database, and network id
func NewTransformer(con config.ContractConfig, bc core.BlockChain, db *postgres.DB) *Transformer {
return &Transformer{
Poller: poller.NewPoller(bc, db, types.HeaderSync),
Fetcher: fetcher.NewFetcher(bc),
Parser: parser.NewParser(con.Network),
HeaderRepository: repository.NewHeaderRepository(db),
Retriever: retriever.NewBlockRetriever(db),
Converter: &converter.Converter{},
Contracts: map[string]*contract.Contract{},
EventRepository: srep.NewEventRepository(db, types.HeaderSync),
Config: con,
}
}
// Use after creating and setting transformer
// Loops over all of the addr => filter sets
// Uses parser to pull event info from abi
// Use this info to generate event filters
func (tr *Transformer) Init() error {
// Initialize internally configured transformer settings
tr.contractAddresses = make([]string, 0) // Holds all contract addresses, for batch fetching of logs
tr.sortedEventIds = make(map[string][]string) // Map to sort event column ids by contract, for post fetch processing and persisting of logs
tr.sortedMethodIds = make(map[string][]string) // Map to sort method column ids by contract, for post fetch method polling
tr.eventIds = make([]string, 0) // Holds event column ids across all contract, for batch fetching of headers
tr.eventFilters = make([]common.Hash, 0) // Holds topic0 hashes across all contracts, for batch fetching of logs
tr.Start = 100000000000
// Iterate through all internal contract addresses
for contractAddr := range tr.Config.Addresses {
// Configure Abi
if tr.Config.Abis[contractAddr] == "" {
// If no abi is given in the config, this method will try fetching from internal look-up table and etherscan
err := tr.Parser.Parse(contractAddr)
if err != nil {
return err
}
} else {
// If we have an abi from the config, load that into the parser
err := tr.Parser.ParseAbiStr(tr.Config.Abis[contractAddr])
if err != nil {
return err
}
}
// Get first block and most recent block number in the header repo
firstBlock, err := tr.Retriever.RetrieveFirstBlock()
if err != nil {
return err
}
// Set to specified range if it falls within the bounds
if firstBlock < tr.Config.StartingBlocks[contractAddr] {
firstBlock = tr.Config.StartingBlocks[contractAddr]
}
// Get contract name if it has one
var name = new(string)
tr.Poller.FetchContractData(tr.Parser.Abi(), contractAddr, "name", nil, name, -1)
// Remove any potential accidental duplicate inputs
eventArgs := map[string]bool{}
for _, arg := range tr.Config.EventArgs[contractAddr] {
eventArgs[arg] = true
}
methodArgs := map[string]bool{}
for _, arg := range tr.Config.MethodArgs[contractAddr] {
methodArgs[arg] = true
}
// Aggregate info into contract object and store for execution
con := contract.Contract{
Name: *name,
Network: tr.Config.Network,
Address: contractAddr,
Abi: tr.Parser.Abi(),
ParsedAbi: tr.Parser.ParsedAbi(),
StartingBlock: firstBlock,
Events: tr.Parser.GetEvents(tr.Config.Events[contractAddr]),
Methods: tr.Parser.GetSelectMethods(tr.Config.Methods[contractAddr]),
FilterArgs: eventArgs,
MethodArgs: methodArgs,
Piping: tr.Config.Piping[contractAddr],
}.Init()
tr.Contracts[contractAddr] = con
tr.contractAddresses = append(tr.contractAddresses, con.Address)
// Create checked_headers columns for each event id and append to list of all event ids
tr.sortedEventIds[con.Address] = make([]string, 0, len(con.Events))
for _, event := range con.Events {
eventId := strings.ToLower(event.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(eventId)
if err != nil {
return err
}
// Keep track of this event id; sorted and unsorted
tr.sortedEventIds[con.Address] = append(tr.sortedEventIds[con.Address], eventId)
tr.eventIds = append(tr.eventIds, eventId)
// Append this event sig to the filters
tr.eventFilters = append(tr.eventFilters, event.Sig())
}
// Create checked_headers columns for each method id and append list of all method ids
tr.sortedMethodIds[con.Address] = make([]string, 0, len(con.Methods))
for _, m := range con.Methods {
methodId := strings.ToLower(m.Name + "_" + con.Address)
err := tr.HeaderRepository.AddCheckColumn(methodId)
if err != nil {
return err
}
tr.sortedMethodIds[con.Address] = append(tr.sortedMethodIds[con.Address], methodId)
}
// Update start to the lowest block
if con.StartingBlock < tr.Start {
tr.Start = con.StartingBlock
}
}
return nil
}
func (tr *Transformer) Execute() error {
if len(tr.Contracts) == 0 {
return errors.New("error: transformer has no initialized contracts")
}
// Find unchecked headers for all events across all contracts; these are returned in asc order
missingHeaders, err := tr.HeaderRepository.MissingHeadersForAll(tr.Start, -1, tr.eventIds)
if err != nil {
return err
}
// Iterate over headers
for _, header := range missingHeaders {
// Set `start` to this header
// This way if we throw an error but don't bring the execution cycle down (how it is currently handled)
// we restart the cycle at this header
tr.Start = header.BlockNumber
// Map to sort batch fetched logs by which contract they belong to, for post fetch processing
sortedLogs := make(map[string][]gethTypes.Log)
// And fetch all event logs across contracts at this header
allLogs, err := tr.Fetcher.FetchLogs(tr.contractAddresses, tr.eventFilters, header)
if err != nil {
return err
}
// If no logs are found mark the header checked for all of these eventIDs
// and continue to method polling and onto the next iteration
if len(allLogs) < 1 {
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, tr.eventIds)
if err != nil {
return err
}
err = tr.methodPolling(header, tr.sortedMethodIds)
if err != nil {
return err
}
tr.Start = header.BlockNumber + 1 // Empty header; setup to start at the next header
continue
}
// Sort logs by the contract they belong to
for _, log := range allLogs {
addr := strings.ToLower(log.Address.Hex())
sortedLogs[addr] = append(sortedLogs[addr], log)
}
// Process logs for each contract
for conAddr, logs := range sortedLogs {
if logs == nil {
continue
}
// Configure converter with this contract
con := tr.Contracts[conAddr]
tr.Converter.Update(con)
// Convert logs into batches of log mappings (eventName => []types.Logs
convertedLogs, err := tr.Converter.ConvertBatch(logs, con.Events, header.Id)
if err != nil {
return err
}
// Cycle through each type of event log and persist them
for eventName, logs := range convertedLogs {
// If logs for this event are empty, mark them checked at this header and continue
if len(logs) < 1 {
eventId := strings.ToLower(eventName + "_" + con.Address)
err = tr.HeaderRepository.MarkHeaderChecked(header.Id, eventId)
if err != nil {
return err
}
continue
}
// If logs aren't empty, persist them
// Header is marked checked in the transactions
err = tr.EventRepository.PersistLogs(logs, con.Events[eventName], con.Address, con.Name)
if err != nil {
return err
}
}
}
// Poll contracts at this block height
err = tr.methodPolling(header, tr.sortedMethodIds)
if err != nil {
return err
}
// Success; setup to start at the next header
tr.Start = header.BlockNumber + 1
}
return nil
}
// Used to poll contract methods at a given header
func (tr *Transformer) methodPolling(header core.Header, sortedMethodIds map[string][]string) error {
for _, con := range tr.Contracts {
// Skip method polling processes if no methods are specified
// Also don't try to poll methods below this contract's specified starting block
if len(con.Methods) == 0 || header.BlockNumber < con.StartingBlock {
continue
}
// Poll all methods for this contract at this header
err := tr.Poller.PollContractAt(*con, header.BlockNumber)
if err != nil {
return err
}
// Mark this header checked for the methods
err = tr.HeaderRepository.MarkHeaderCheckedForAll(header.Id, sortedMethodIds[con.Address])
if err != nil {
return err
}
}
return nil
}
func (tr *Transformer) GetConfig() config.ContractConfig {
return tr.Config
}