-
Notifications
You must be signed in to change notification settings - Fork 0
/
standardchannel.go
157 lines (132 loc) · 5.85 KB
/
standardchannel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*
Copyright IBM Corp. 2017 All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package msgprocessor
import (
"fmt"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/policies"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
)
// StandardChannelSupport includes the resources needed for the StandardChannel processor.
type StandardChannelSupport interface {
// Sequence should return the current configSeq
Sequence() uint64
// ChainID returns the ChannelID
ChainID() string
// Signer returns the signer for this orderer
Signer() crypto.LocalSigner
// ProposeConfigUpdate takes in an Envelope of type CONFIG_UPDATE and produces a
// ConfigEnvelope to be used as the Envelope Payload Data of a CONFIG message
ProposeConfigUpdate(configtx *cb.Envelope) (*cb.ConfigEnvelope, error)
OrdererConfig() (channelconfig.Orderer, bool)
}
// StandardChannel implements the Processor interface for standard extant channels
type StandardChannel struct {
support StandardChannelSupport
filters *RuleSet // Rules applicable to both normal and config messages
maintenanceFilter Rule // Rule applicable only to config messages
}
// NewStandardChannel creates a new standard message processor
func NewStandardChannel(support StandardChannelSupport, filters *RuleSet) *StandardChannel {
return &StandardChannel{
filters: filters,
support: support,
maintenanceFilter: NewMaintenanceFilter(support),
}
}
// CreateStandardChannelFilters creates the set of filters for a normal (non-system) chain.
//
// In maintenance mode, require the signature of /Channel/Orderer/Writer. This will filter out configuration
// changes that are not related to consensus-type migration (e.g on /Channel/Application).
func CreateStandardChannelFilters(filterSupport channelconfig.Resources) *RuleSet {
return NewRuleSet([]Rule{
EmptyRejectRule,
NewExpirationRejectRule(filterSupport),
NewSizeFilter(filterSupport),
NewSigFilter(policies.ChannelWriters, policies.ChannelOrdererWriters, filterSupport),
})
}
// ClassifyMsg inspects the message to determine which type of processing is necessary
func (s *StandardChannel) ClassifyMsg(chdr *cb.ChannelHeader) Classification {
switch chdr.Type {
case int32(cb.HeaderType_CONFIG_UPDATE):
return ConfigUpdateMsg
case int32(cb.HeaderType_ORDERER_TRANSACTION):
// In order to maintain backwards compatibility, we must classify these messages
return ConfigMsg
case int32(cb.HeaderType_CONFIG):
// In order to maintain backwards compatibility, we must classify these messages
return ConfigMsg
default:
return NormalMsg
}
}
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
oc, ok := s.support.OrdererConfig()
if !ok {
logger.Panicf("Missing orderer config")
}
if oc.Capabilities().ConsensusTypeMigration() {
if oc.ConsensusState() != orderer.ConsensusType_STATE_NORMAL {
return 0, errors.WithMessage(
ErrMaintenanceMode, "normal transactions are rejected")
}
}
configSeq = s.support.Sequence()
err = s.filters.Apply(env)
return
}
// ProcessConfigUpdateMsg will attempt to apply the config impetus msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config impetus message
// is invalid, an error is returned.
func (s *StandardChannel) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
logger.Debugf("Processing config update message for channel %s", s.support.ChainID())
// Call Sequence first. If seq advances between proposal and acceptance, this is okay, and will cause reprocessing
// however, if Sequence is called last, then a success could be falsely attributed to a newer configSeq
seq := s.support.Sequence()
err = s.filters.Apply(env)
if err != nil {
return nil, 0, err
}
configEnvelope, err := s.support.ProposeConfigUpdate(env)
if err != nil {
return nil, 0, errors.WithMessage(err, fmt.Sprintf("error applying config update to existing channel '%s'", s.support.ChainID()))
}
config, err = utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChainID(), s.support.Signer(), configEnvelope, msgVersion, epoch)
if err != nil {
return nil, 0, err
}
// We re-apply the filters here, especially for the size filter, to ensure that the transaction we
// just constructed is not too large for our consenter. It additionally reapplies the signature
// check, which although not strictly necessary, is a good sanity check, in case the orderer
// has not been configured with the right cert material. The additional overhead of the signature
// check is negligible, as this is the reconfig path and not the normal path.
err = s.filters.Apply(config)
if err != nil {
return nil, 0, err
}
err = s.maintenanceFilter.Apply(config)
if err != nil {
return nil, 0, err
}
return config, seq, nil
}
// ProcessConfigMsg takes an envelope of type `HeaderType_CONFIG`, unpacks the `ConfigEnvelope` from it
// extracts the `ConfigUpdate` from `LastUpdate` field, and calls `ProcessConfigUpdateMsg` on it.
func (s *StandardChannel) ProcessConfigMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
logger.Debugf("Processing config message for channel %s", s.support.ChainID())
configEnvelope := &cb.ConfigEnvelope{}
_, err = utils.UnmarshalEnvelopeOfType(env, cb.HeaderType_CONFIG, configEnvelope)
if err != nil {
return
}
return s.ProcessConfigUpdateMsg(configEnvelope.LastUpdate)
}