forked from hyperledger/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
systemchannelfilter.go
162 lines (128 loc) · 4.88 KB
/
systemchannelfilter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package msgprocessor
import (
"github.com/hyperledger/fabric/common/channelconfig"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)
// ChainCreator defines the methods necessary to simulate channel creation.
type ChainCreator interface {
// NewChannelConfig returns a template config for a new channel.
NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error)
// CreateBundle parses the config into resources
CreateBundle(channelID string, config *cb.Config) (channelconfig.Resources, error)
// ChannelsCount returns the count of channels which currently exist.
ChannelsCount() int
}
// LimitedSupport defines the subset of the channel resources required by the systemchannel filter.
type LimitedSupport interface {
OrdererConfig() (channelconfig.Orderer, bool)
}
// SystemChainFilter implements the filter.Rule interface.
type SystemChainFilter struct {
cc ChainCreator
support LimitedSupport
}
// NewSystemChannelFilter returns a new instance of a *SystemChainFilter.
func NewSystemChannelFilter(ls LimitedSupport, cc ChainCreator) *SystemChainFilter {
return &SystemChainFilter{
support: ls,
cc: cc,
}
}
// Apply rejects bad messages with an error.
func (scf *SystemChainFilter) Apply(env *cb.Envelope) error {
msgData := &cb.Payload{}
err := proto.Unmarshal(env.Payload, msgData)
if err != nil {
return errors.Errorf("bad payload: %s", err)
}
if msgData.Header == nil {
return errors.Errorf("missing payload header")
}
chdr, err := utils.UnmarshalChannelHeader(msgData.Header.ChannelHeader)
if err != nil {
return errors.Errorf("bad channel header: %s", err)
}
if chdr.Type != int32(cb.HeaderType_ORDERER_TRANSACTION) {
return nil
}
ordererConfig, ok := scf.support.OrdererConfig()
if !ok {
logger.Panicf("System channel does not have orderer config")
}
maxChannels := ordererConfig.MaxChannelsCount()
if maxChannels > 0 {
// We check for strictly greater than to accommodate the system channel
if uint64(scf.cc.ChannelsCount()) > maxChannels {
return errors.Errorf("channel creation would exceed maximimum number of channels: %d", maxChannels)
}
}
configTx := &cb.Envelope{}
err = proto.Unmarshal(msgData.Data, configTx)
if err != nil {
return errors.Errorf("payload data error unmarshaling to envelope: %s", err)
}
return scf.authorizeAndInspect(configTx)
}
func (scf *SystemChainFilter) authorizeAndInspect(configTx *cb.Envelope) error {
payload := &cb.Payload{}
err := proto.Unmarshal(configTx.Payload, payload)
if err != nil {
return errors.Errorf("error unmarshaling wrapped configtx envelope payload: %s", err)
}
if payload.Header == nil {
return errors.Errorf("wrapped configtx envelope missing header")
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return errors.Errorf("error unmarshaling wrapped configtx envelope channel header: %s", err)
}
if chdr.Type != int32(cb.HeaderType_CONFIG) {
return errors.Errorf("wrapped configtx envelope not a config transaction")
}
configEnvelope := &cb.ConfigEnvelope{}
err = proto.Unmarshal(payload.Data, configEnvelope)
if err != nil {
return errors.Errorf("error unmarshalling wrapped configtx config envelope from payload: %s", err)
}
if configEnvelope.LastUpdate == nil {
return errors.Errorf("updated config does not include a config update")
}
res, err := scf.cc.NewChannelConfig(configEnvelope.LastUpdate)
if err != nil {
return errors.Errorf("error constructing new channel config from update: %s", err)
}
// Make sure that the config was signed by the appropriate authorized entities
newChannelConfigEnv, err := res.ConfigtxValidator().ProposeConfigUpdate(configEnvelope.LastUpdate)
if err != nil {
return errors.Errorf("error proposing channel update to new channel config: %s", err)
}
// reflect.DeepEqual will not work here, because it considers nil and empty maps as unequal
if !proto.Equal(newChannelConfigEnv, configEnvelope) {
return errors.Errorf("config proposed by the channel creation request did not match the config received with the channel creation request")
}
bundle, err := scf.cc.CreateBundle(res.ConfigtxValidator().ChainID(), newChannelConfigEnv.Config)
if err != nil {
return errors.Wrap(err, "config does not validly parse")
}
if err = res.ValidateNew(bundle); err != nil {
return errors.Wrap(err, "new bundle invalid")
}
oc, ok := bundle.OrdererConfig()
if !ok {
return errors.New("config is missing orderer group")
}
if err = oc.Capabilities().Supported(); err != nil {
return errors.Wrap(err, "config update is not compatible")
}
if err = bundle.ChannelConfig().Capabilities().Supported(); err != nil {
return errors.Wrap(err, "config update is not compatible")
}
return nil
}