-
Notifications
You must be signed in to change notification settings - Fork 36
/
db.go
165 lines (139 loc) · 4.9 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package app
import (
"context"
"time"
"github.com/omni-network/omni/explorer/db/ent"
"github.com/omni-network/omni/explorer/db/ent/xprovidercursor"
"github.com/omni-network/omni/lib/errors"
"github.com/omni-network/omni/lib/log"
"github.com/omni-network/omni/lib/xchain"
)
// newCallback returns the indexer xprovider callback that
// inserts xblocks into the DB. It also updates cursors.
func newCallback(client *ent.Client) xchain.ProviderCallback {
return func(ctx context.Context, block xchain.Block) error {
tx, err := client.BeginTx(ctx, nil)
if err != nil {
return errors.Wrap(err, "begin transaction")
}
if err := InsertBlockTX(ctx, tx, block); err != nil {
if err := tx.Rollback(); err != nil { // Just log on rollback failure
log.Error(ctx, "Rollback transaction failed", err)
}
return errors.Wrap(err, "insert xblock")
}
log.Info(ctx, "Inserted xblock", "chain", block.SourceChainID, "offset", block.BlockOffset, "msgs", len(block.Msgs), "receipts", len(block.Receipts))
return nil
}
}
// InsertBlockTX inserts the block as part of a tx and commits it.
// The caller should handle rollback on any error.
func InsertBlockTX(ctx context.Context, tx *ent.Tx, block xchain.Block) error {
insertedBlock, err := insertBlock(ctx, tx, block)
if err != nil {
return errors.Wrap(err, "insert block")
}
err = insertMessages(ctx, tx, block, insertedBlock)
if err != nil {
return errors.Wrap(err, "insert messages")
}
err = insertReceipts(ctx, tx, block, insertedBlock)
if err != nil {
return errors.Wrap(err, "insert receipts")
}
if err := incrementCursor(ctx, tx, block.SourceChainID, block.BlockHeight, block.BlockOffset); err != nil {
return errors.Wrap(err, "increment cursor")
}
if err := tx.Commit(); err != nil {
return errors.Wrap(err, "commit transaction")
}
return nil
}
// incrementCursor increments the cursor for the given chainID (it ensures it matches height).
func incrementCursor(ctx context.Context, tx *ent.Tx, chainID, height, offset uint64) error {
c, ok, err := cursor(ctx, tx.XProviderCursor, chainID)
if err != nil {
return errors.Wrap(err, "query cursor")
} else if !ok {
return errors.New("cursor not found")
}
// } else if c.Offset > 1 && c.Offset != offset-1 {
// // Sanity check, we MUST insert sequentially (after 1).
// return errors.New("unexpected cursor vs block offset mismatch [BUG]", "cursor_height", c.Height, "block_height", height, "cursor_offset", c.Offset, "block_offset", offset)
// }
if _, err := tx.XProviderCursor.UpdateOne(c).SetHeight(height).SetOffset(offset).Save(ctx); err != nil {
return errors.Wrap(err, "update cursor")
}
return nil
}
// cursor returns the current cursor for the given chainID, or false if it doesn't exist.
func cursor(ctx context.Context, client *ent.XProviderCursorClient, chainID uint64) (*ent.XProviderCursor, bool, error) {
cursor, err := client.Query().Where(xprovidercursor.ChainID(chainID)).Only(ctx)
if ent.IsNotFound(err) {
return nil, false, nil
} else if err != nil {
return nil, false, errors.Wrap(err, "query cursor")
}
return cursor, true, nil
}
func insertBlock(ctx context.Context, tx *ent.Tx, block xchain.Block) (*ent.Block, error) {
b, err := tx.Block.Create().
SetHeight(block.BlockHeight).
SetHash(block.BlockHash[:]).
SetChainID(block.SourceChainID).
SetOffset(block.BlockOffset).
SetTimestamp(block.Timestamp).
Save(ctx)
if err != nil {
return nil, errors.Wrap(err, "inserting block to db")
}
return b, nil
}
func insertMessages(ctx context.Context, tx *ent.Tx, block xchain.Block, dbBlock *ent.Block) error {
for _, m := range block.Msgs {
msg, err := tx.Msg.Create().
SetData(m.Data).
SetTo(m.DestAddress[:]).
SetDestChainID(m.DestChainID).
SetCreatedAt(time.Now()).
SetSourceChainID(m.SourceChainID).
SetGasLimit(m.DestGasLimit).
SetSender(m.SourceMsgSender[:]).
SetOffset(m.StreamOffset).
SetTxHash(m.TxHash[:]).
SetCreatedAt(time.Now()).
Save(ctx)
if err != nil {
return errors.Wrap(err, "inserting message")
}
_, err = tx.Block.UpdateOne(dbBlock).AddMsgs(msg).Save(ctx)
if err != nil {
return errors.Wrap(err, "setting message edge to block")
}
}
return nil
}
func insertReceipts(ctx context.Context, tx *ent.Tx, block xchain.Block, dbBlock *ent.Block) error {
for _, r := range block.Receipts {
receipt, err := tx.Receipt.Create().
SetGasUsed(r.GasUsed).
SetBlockHash(block.BlockHash[:]).
SetDestChainID(r.DestChainID).
SetSourceChainID(r.SourceChainID).
SetOffset(r.StreamOffset).
SetSuccess(r.Success).
SetRelayerAddress(r.RelayerAddress.Bytes()).
SetTxHash(r.TxHash.Bytes()).
SetRevertReason(string(r.Error)).
SetCreatedAt(time.Now()).
Save(ctx)
if err != nil {
return errors.Wrap(err, "inserting message")
}
_, err = tx.Block.UpdateOne(dbBlock).AddReceipts(receipt).Save(ctx)
if err != nil {
return errors.Wrap(err, "setting receipt edge to block")
}
}
return nil
}