/
onNewBlock.go
74 lines (69 loc) · 2.88 KB
/
onNewBlock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
Copyright [2019] - [2021], PERSISTENCE TECHNOLOGIES PTE. LTD. and the persistenceBridge contributors
SPDX-License-Identifier: Apache-2.0
*/
package tendermint
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/persistenceOne/persistenceBridge/application/constants"
"github.com/Shopify/sarama"
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/codec"
sdkTypes "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/auth/signing"
"github.com/cosmos/relayer/relayer"
"github.com/persistenceOne/persistenceBridge/application/db"
"github.com/persistenceOne/persistenceBridge/kafka/utils"
"github.com/persistenceOne/persistenceBridge/utilities/logging"
)
func onNewBlock(ctx context.Context, clientCtx client.Context, chain *relayer.Chain, kafkaProducer *sarama.SyncProducer, protoCodec *codec.ProtoCodec) error {
return db.IterateOutgoingTmTx(func(key []byte, value []byte) error {
var tmTx db.OutgoingTendermintTransaction
err := json.Unmarshal(value, &tmTx)
if err != nil {
return fmt.Errorf("failed to unmarshal OutgoingTendermintTransaction %s [TM onNewBlock]: %s", string(key), err.Error())
}
txHashBytes, err := hex.DecodeString(tmTx.TxHash)
if err != nil {
return fmt.Errorf("invalid tx hash %s [TM onNewBlock]: %s", tmTx.TxHash, err.Error())
}
txResult, err := chain.Client.Tx(ctx, txHashBytes, true)
if err != nil {
if err.Error() == fmt.Sprintf("RPC error -32603 - Internal error: tx (%s) not found", tmTx.TxHash) {
logging.Info("Tendermint tx still pending:", tmTx.TxHash)
return nil
}
logging.Error(fmt.Sprintf("Tendermint tx hash %s search failed [TM onNewBlock]: %s", tmTx.TxHash, err.Error()))
return err
}
if txResult.TxResult.GetCode() != 0 {
logging.Error(fmt.Sprintf("Broadcast tendermint tx %s (block: %d) failed, Code: %d, Log: %s", tmTx.TxHash, txResult.Height, txResult.TxResult.GetCode(), txResult.TxResult.Log))
txInterface, err := clientCtx.TxConfig.TxDecoder()(txResult.Tx)
if err != nil {
return err
}
transaction, ok := txInterface.(signing.Tx)
if !ok {
return fmt.Errorf("unable to parse transaction into signing.Tx [TM onNewBlock]")
}
for _, msg := range transaction.GetMsgs() {
if sdkTypes.MsgTypeURL(msg) != constants.MsgWithdrawDelegatorRewardTypeUrl {
msgBytes, err := protoCodec.MarshalInterface(msg)
if err != nil {
return fmt.Errorf("failed to generate msgBytes [TM onNewBlock]: %s", err.Error())
}
err = utils.ProducerDeliverMessage(msgBytes, utils.RetryTendermint, *kafkaProducer)
if err != nil {
return fmt.Errorf("failed to add messages of %s to kafka queue [TM onNewBlock] RetryTendermint: %s", msg.String(), err.Error())
}
}
}
} else {
logging.Info("Broadcast tendermint tx successful. Hash:", tmTx.TxHash, "Block:", txResult.Height)
}
return db.DeleteOutgoingTendermintTx(tmTx.TxHash)
})
}