forked from hyperledger/fabric
/
engine.go
132 lines (111 loc) · 4.5 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
import (
"github.com/hyperledger/fabric/consensus"
"github.com/hyperledger/fabric/core/peer"
"fmt"
"sync"
"github.com/hyperledger/fabric/consensus/controller"
"github.com/hyperledger/fabric/consensus/util"
"github.com/hyperledger/fabric/core/chaincode"
pb "github.com/hyperledger/fabric/protos"
"golang.org/x/net/context"
)
// EngineImpl implements a struct to hold consensus.Consenter, PeerEndpoint and MessageFan
type EngineImpl struct {
consenter consensus.Consenter
helper *Helper
peerEndpoint *pb.PeerEndpoint
consensusFan *util.MessageFan
}
// GetHandlerFactory returns new NewConsensusHandler
func (eng *EngineImpl) GetHandlerFactory() peer.HandlerFactory {
return NewConsensusHandler
}
// ProcessTransactionMsg processes a Message in context of a Transaction
func (eng *EngineImpl) ProcessTransactionMsg(msg *pb.Message, tx *pb.Transaction) (response *pb.Response) {
//TODO: Do we always verify security, or can we supply a flag on the invoke ot this functions so to bypass check for locally generated transactions?
if tx.Type == pb.Transaction_CHAINCODE_QUERY {
if !engine.helper.valid {
logger.Warning("Rejecting query because state is currently not valid")
return &pb.Response{Status: pb.Response_FAILURE,
Msg: []byte("Error: state may be inconsistent, cannot query")}
}
// The secHelper is set during creat ChaincodeSupport, so we don't need this step
// cxt := context.WithValue(context.Background(), "security", secHelper)
cxt := context.Background()
//query will ignore events as these are not stored on ledger (and query can report
//"event" data synchronously anyway)
result, _, err := chaincode.Execute(cxt, chaincode.GetChain(chaincode.DefaultChain), tx)
if err != nil {
response = &pb.Response{Status: pb.Response_FAILURE,
Msg: []byte(fmt.Sprintf("Error:%s", err))}
} else {
response = &pb.Response{Status: pb.Response_SUCCESS, Msg: result}
}
} else {
// Chaincode Transaction
response = &pb.Response{Status: pb.Response_SUCCESS, Msg: []byte(tx.Txid)}
//TODO: Do we need to verify security, or can we supply a flag on the invoke ot this functions
// If we fail to marshal or verify the tx, don't send it to consensus plugin
if response.Status == pb.Response_FAILURE {
return response
}
// Pass the message to the consenter (eg. PBFT) NOTE: Make sure engine has been initialized
if eng.consenter == nil {
return &pb.Response{Status: pb.Response_FAILURE, Msg: []byte("Engine not initialized")}
}
// TODO, do we want to put these requests into a queue? This will block until
// the consenter gets around to handling the message, but it also provides some
// natural feedback to the REST API to determine how long it takes to queue messages
err := eng.consenter.RecvMsg(msg, eng.peerEndpoint.ID)
if err != nil {
response = &pb.Response{Status: pb.Response_FAILURE, Msg: []byte(err.Error())}
}
}
return response
}
func (eng *EngineImpl) setConsenter(consenter consensus.Consenter) *EngineImpl {
eng.consenter = consenter
return eng
}
func (eng *EngineImpl) setPeerEndpoint(peerEndpoint *pb.PeerEndpoint) *EngineImpl {
eng.peerEndpoint = peerEndpoint
return eng
}
var engineOnce sync.Once
var engine *EngineImpl
func getEngineImpl() *EngineImpl {
return engine
}
// GetEngine returns initialized peer.Engine
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
var err error
engineOnce.Do(func() {
engine = new(EngineImpl)
engine.helper = NewHelper(coord)
engine.consenter = controller.NewConsenter(engine.helper)
engine.helper.setConsenter(engine.consenter)
engine.peerEndpoint, err = coord.GetPeerEndpoint()
engine.consensusFan = util.NewMessageFan()
go func() {
logger.Debug("Starting up message thread for consenter")
// The channel never closes, so this should never break
for msg := range engine.consensusFan.GetOutChannel() {
engine.consenter.RecvMsg(msg.Msg, msg.Sender)
}
}()
})
return engine, err
}