-
Notifications
You must be signed in to change notification settings - Fork 1
/
eventhelper.go
149 lines (118 loc) · 5.39 KB
/
eventhelper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package producer
import (
"fmt"
"github.com/mcc-github/blockchain/core/ledger/util"
"github.com/mcc-github/blockchain/protos/common"
pb "github.com/mcc-github/blockchain/protos/peer"
"github.com/mcc-github/blockchain/protos/utils"
)
func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event, channelID string, err error) {
blockForEvent := &common.Block{}
filteredBlockForEvent := &pb.FilteredBlock{}
filteredTxArray := []*pb.FilteredTransaction{}
var headerType common.HeaderType
blockForEvent.Header = block.Header
blockForEvent.Metadata = block.Metadata
blockForEvent.Data = &common.BlockData{}
txsFltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
for txIndex, d := range block.Data.Data {
ebytes := d
if ebytes != nil {
if env, err := utils.GetEnvelopeFromBlock(ebytes); err != nil {
logger.Errorf("error getting tx from block: %s", err)
} else if env != nil {
payload, err := utils.GetPayload(env)
if err != nil {
return nil, nil, "", fmt.Errorf("could not extract payload from envelope: %s", err)
}
if payload.Header == nil {
logger.Debugf("transaction payload header is nil, %d, block num %d",
txIndex, block.Header.Number)
continue
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, nil, "", err
}
channelID = chdr.ChannelId
headerType = common.HeaderType(chdr.Type)
if headerType == common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Channel [%s]: Block event for block number [%d] contains transaction id: %s", channelID, block.Header.Number, chdr.TxId)
tx, err := utils.GetTransaction(payload.Data)
if err != nil {
return nil, nil, "", fmt.Errorf("error unmarshalling transaction payload for block event: %s", err)
}
filteredTx := &pb.FilteredTransaction{Txid: chdr.TxId, TxValidationCode: txsFltr.Flag(txIndex), Type: headerType}
transactionActions := &pb.FilteredTransactionActions{}
for _, action := range tx.Actions {
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(action.Payload)
if err != nil {
return nil, nil, "", fmt.Errorf("error unmarshalling transaction action payload for block event: %s", err)
}
if chaincodeActionPayload.Action == nil {
logger.Debugf("chaincode action, the payload action is nil, skipping")
continue
}
propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload)
if err != nil {
return nil, nil, "", fmt.Errorf("error unmarshalling proposal response payload for block event: %s", err)
}
caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension)
if err != nil {
return nil, nil, "", fmt.Errorf("error unmarshalling chaincode action for block event: %s", err)
}
ccEvent, err := utils.GetChaincodeEvents(caPayload.Events)
if err != nil {
return nil, nil, "", fmt.Errorf("error unmarshalling chaincode event for block event: %s", err)
}
chaincodeAction := &pb.FilteredChaincodeAction{}
if ccEvent.GetChaincodeId() != "" {
filteredCcEvent := ccEvent
filteredCcEvent.Payload = nil
chaincodeAction.ChaincodeEvent = filteredCcEvent
}
transactionActions.ChaincodeActions = append(transactionActions.ChaincodeActions, chaincodeAction)
caPayload.Results = nil
chaincodeActionPayload.Action.ProposalResponsePayload, err = utils.GetBytesProposalResponsePayload(propRespPayload.ProposalHash, caPayload.Response, caPayload.Results, caPayload.Events, caPayload.ChaincodeId)
if err != nil {
return nil, nil, "", fmt.Errorf("error marshalling tx proposal payload for block event: %s", err)
}
action.Payload, err = utils.GetBytesChaincodeActionPayload(chaincodeActionPayload)
if err != nil {
return nil, nil, "", fmt.Errorf("error marshalling tx action payload for block event: %s", err)
}
}
filteredTx.Data = &pb.FilteredTransaction_TransactionActions{TransactionActions: transactionActions}
filteredTxArray = append(filteredTxArray, filteredTx)
payload.Data, err = utils.GetBytesTransaction(tx)
if err != nil {
return nil, nil, "", fmt.Errorf("error marshalling payload for block event: %s", err)
}
env.Payload, err = utils.GetBytesPayload(payload)
if err != nil {
return nil, nil, "", fmt.Errorf("error marshalling tx envelope for block event: %s", err)
}
ebytes, err = utils.GetBytesEnvelope(env)
if err != nil {
return nil, nil, "", fmt.Errorf("cannot marshal transaction: %s", err)
}
}
}
}
blockForEvent.Data.Data = append(blockForEvent.Data.Data, ebytes)
}
filteredBlockForEvent.ChannelId = channelID
filteredBlockForEvent.Number = block.Header.Number
filteredBlockForEvent.FilteredTransactions = filteredTxArray
return CreateBlockEvent(blockForEvent), CreateFilteredBlockEvent(filteredBlockForEvent), channelID, nil
}
func CreateBlockEvent(te *common.Block) *pb.Event {
return &pb.Event{Event: &pb.Event_Block{Block: te}}
}
func CreateFilteredBlockEvent(te *pb.FilteredBlock) *pb.Event {
return &pb.Event{Event: &pb.Event_FilteredBlock{FilteredBlock: te}}
}