-
Notifications
You must be signed in to change notification settings - Fork 36
/
blockchain_source.go
118 lines (104 loc) · 3.37 KB
/
blockchain_source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package sources
import (
"context"
"fmt"
"github.com/tonkeeper/opentonapi/internal/g"
"github.com/tonkeeper/opentonapi/pkg/blockchain/indexer"
"github.com/tonkeeper/tongo"
"github.com/tonkeeper/tongo/abi"
"github.com/tonkeeper/tongo/boc"
"github.com/tonkeeper/tongo/liteapi"
"go.uber.org/zap"
)
// BlockchainSource notifies about transactions in the TON blockchain.
type BlockchainSource struct {
txDispatcher txDispatcher
blockDispatcher blockDispatcher
client *liteapi.Client
logger *zap.Logger
}
type txDispatcher interface {
RegisterSubscriber(fn DeliveryFn, options SubscribeToTransactionsOptions) CancelFn
Run(ctx context.Context) chan TransactionEvent
}
type blockDispatcher interface {
RegisterSubscriber(fn DeliveryFn, options SubscribeToBlockHeadersOptions) CancelFn
Run(ctx context.Context) chan BlockEvent
}
func NewBlockchainSource(logger *zap.Logger, cli *liteapi.Client) *BlockchainSource {
return &BlockchainSource{
txDispatcher: NewTransactionDispatcher(logger),
blockDispatcher: NewBlockDispatcher(logger),
client: cli,
logger: logger,
}
}
var _ BlockHeadersSource = (*BlockchainSource)(nil)
var _ TransactionSource = (*BlockchainSource)(nil)
func (b *BlockchainSource) SubscribeToTransactions(ctx context.Context, deliveryFn DeliveryFn, opts SubscribeToTransactionsOptions) CancelFn {
b.logger.Debug("subscribe to transactions",
zap.Bool("all-accounts", opts.AllAccounts),
zap.Bool("all-operations", opts.AllOperations),
zap.Stringers("accounts", opts.Accounts),
zap.Strings("operations", opts.Operations))
return b.txDispatcher.RegisterSubscriber(deliveryFn, opts)
}
func (b *BlockchainSource) SubscribeToBlockHeaders(ctx context.Context, deliveryFn DeliveryFn, opts SubscribeToBlockHeadersOptions) CancelFn {
b.logger.Debug("subscribe to blocks",
zap.Intp("workchain", opts.Workchain))
return b.blockDispatcher.RegisterSubscriber(deliveryFn, opts)
}
func msgOpCodeAndName(cell *boc.Cell) (opCode *uint32, opName *abi.MsgOpName) {
if cell.BitsAvailableForRead() < 32 {
return nil, nil
}
opcode, err := cell.ReadUint(32)
if err != nil {
return nil, nil
}
msgOpCode := g.Pointer(uint32(opcode))
cell.ResetCounters()
name, _, err := abi.MessageDecoder(cell)
if err != nil {
return msgOpCode, nil
}
return msgOpCode, &name
}
func (b *BlockchainSource) Run(ctx context.Context) chan indexer.IDandBlock {
newBlockCh := make(chan indexer.IDandBlock)
go func() {
ch := b.txDispatcher.Run(ctx)
blockCh := b.blockDispatcher.Run(ctx)
for {
select {
case <-ctx.Done():
return
case block := <-newBlockCh:
blockCh <- BlockEvent{
Workchain: block.ID.Workchain,
Shard: fmt.Sprintf("%x", block.ID.Shard),
Seqno: block.ID.Seqno,
RootHash: block.ID.RootHash.Hex(),
FileHash: block.ID.FileHash.Hex(),
}
transactions := block.Block.AllTransactions()
for _, tx := range transactions {
var msgOpCode *uint32
var msgOpName *abi.MsgOpName
if tx.Msgs.InMsg.Exists {
cell := boc.Cell(tx.Msgs.InMsg.Value.Value.Body.Value)
msgOpCode, msgOpName = msgOpCodeAndName(&cell)
}
ch <- TransactionEvent{
AccountID: *tongo.NewAccountId(block.ID.Workchain, tx.AccountAddr),
Lt: tx.Lt,
TxHash: tx.Hash().Hex(),
MsgOpName: msgOpName,
MsgOpCode: msgOpCode,
}
}
}
}
}()
return newBlockCh
}