-
Notifications
You must be signed in to change notification settings - Fork 2
/
subscriptions.block.go
60 lines (51 loc) · 1.59 KB
/
subscriptions.block.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package osmosis
import (
"fmt"
"time"
ctypes "github.com/cometbft/cometbft/rpc/core/types"
tmtypes "github.com/cometbft/cometbft/types"
)
func (p *Publisher) subscribeBlocks() error {
return p.rpc.Subscribe(fmt.Sprintf("tm.event='%s'", tmtypes.EventNewBlock), p.handleBlocks)
}
func (p *Publisher) handleBlocks(events <-chan ctypes.ResultEvent) error {
sentinel := p.MakeSentinel(time.Minute)
for {
select {
case <-p.Context.Done():
p.Logger.Info("handleTransactions: c.Context Done")
return nil
case ev, ok := <-events:
if !ok {
p.Logger.Info("handleTransactions: events closed")
return nil
}
if err := sentinel(); err != nil {
return err
}
switch data := ev.Data.(type) {
case tmtypes.EventDataNewBlock:
now := time.Now()
p.Logger.Info("Block START", "hash", data.Block.Hash().String(), "height", data.Block.Height, "time", data.Block.Time, "len(events)", len(events))
p.indexer.SetLatestBlockHeight(uint64(data.Block.Height), data.Block.Time)
p.handleBlock(data.Block)
p.handleMonitoredPools(data.Block.Height, data.Block.Time, data.Block.Hash().String())
p.Logger.Info("Block FINISH", "hash", data.Block.Hash().String(), "height", data.Block.Height, "duration", time.Since(now), "len(events)", len(events))
default:
p.evtOtherCounter.Add(1)
}
}
}
}
func (p *Publisher) handleBlock(block *tmtypes.Block) {
p.blockCounter.Add(1)
p.blockHeight.Set(float64(block.Height))
p.blocksCounter.Add(1)
outBlock := p.rpc.translateBlock(block)
outBlock.Nonce = p.NewNonce()
p.Publish(
outBlock,
"block",
)
p.messagesCounter.Add(1)
}