-
Notifications
You must be signed in to change notification settings - Fork 0
/
genesisIndexer.go
129 lines (103 loc) · 3.1 KB
/
genesisIndexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package indexer
import (
"sync"
"context"
"database/sql"
"strings"
"net/http"
"time"
"encoding/json"
"github.com/gorilla/websocket"
log "github.com/inconshreveable/log15"
"github.com/openrelayxyz/cardinal-streams/transports"
"github.com/openrelayxyz/cardinal-types/hexutil"
"github.com/openrelayxyz/cardinal-flume/config"
)
type message struct {
Id int `json:"id"`
Method string `json:"method"`
Params []string `json:"params"`
}
type resultMessage struct {
Type string `json:"type"`
Batch *transports.TransportBatch `json:"batch,omitempty"`
}
type outerResult struct {
Result *resultMessage `json:"result"`
JsonRPC string `json:"jsonrpc"`
Id int `json:"id"`
}
func IndexGenesis(cfg *config.Config, db *sql.DB, indexers []Indexer, mut *sync.RWMutex) error {
if cfg.LatestBlock > 0 {
log.Info("Indexing continuing from block", "number", cfg.LatestBlock)
return nil
}
var wsURL string
for _, broker := range cfg.BrokerParams {
if strings.HasPrefix(broker.URL, "ws://") || strings.HasPrefix(broker.URL, "wss://") {
wsURL = broker.URL
log.Info("found websocket broker, reindexer", "broker", wsURL)
break
}
}
dialer := &websocket.Dialer{
EnableCompression: true,
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second,
}
conn, _, err := dialer.Dial(wsURL, nil)
if err != nil {
log.Error("Websocket dial error, genesis indexer", "err", err.Error())
return err
}
genesis := uint64(0)
params := []string{hexutil.EncodeUint64(genesis)}
message := message{
Id: 1,
Method: "cardinal_streamsBlock",
Params: params,
}
msg, err := json.Marshal(message)
if err != nil {
log.Error("cannot json marshal message, reindexer, block", genesis, "err", err.Error())
}
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
log.Error("failed to send message, genesis indexer", "err", err.Error())
}
_, resultBytes, err := conn.ReadMessage()
if err != nil {
log.Error("Error reading transport batch, reindexer, on block", genesis, "err", err.Error())
return err
}
var or *outerResult
if err := json.Unmarshal(resultBytes, &or); err != nil {
log.Error("cannot unmarshal transportBytes, reindexer, on block", genesis, "err", err.Error())
return err
}
pb := or.Result.Batch
genesisStatements := []string{}
for _, indexer := range indexers {
statements, err := indexer.Index(pb.ToPendingBatch())
if err != nil {
log.Error("Error generating statement genesis indexer, on indexer", indexer, "err", err.Error())
return err
}
genesisStatements = append(genesisStatements, statements...)
}
mut.Lock()
dbtx, err := db.BeginTx(context.Background(), nil)
if err != nil {
log.Error("Error creating database transaction genesis indexer", "err", err.Error())
return err
}
if _, err := dbtx.Exec(strings.Join(genesisStatements, " ; ")); err != nil {
log.Error("Failed to execute statement genesis indexer", "err", err.Error())
return err
}
if err := dbtx.Commit(); err != nil {
log.Error("Failed to commit genesis block genesis indexer", "err", err.Error())
return err
}
mut.Unlock()
return nil
}