-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblockcutter.go
138 lines (112 loc) · 4.63 KB
/
blockcutter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blockcutter
import (
"time"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
)
var logger = flogging.MustGetLogger("orderer.common.blockcutter")
type OrdererConfigFetcher interface {
OrdererConfig() (channelconfig.Orderer, bool)
}
// Receiver defines a sink for the ordered broadcast messages
type Receiver interface {
// Ordered should be invoked sequentially as messages are ordered
// Each batch in `messageBatches` will be wrapped into a block.
// `pending` indicates if there are still messages pending in the receiver.
Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool)
// Cut returns the current batch and starts a new one
Cut() []*cb.Envelope
}
type receiver struct {
sharedConfigFetcher OrdererConfigFetcher
pendingBatch []*cb.Envelope
pendingBatchSizeBytes uint32
PendingBatchStartTime time.Time
ChannelID string
Metrics *Metrics
}
// NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager
func NewReceiverImpl(channelID string, sharedConfigFetcher OrdererConfigFetcher, metrics *Metrics) Receiver {
return &receiver{
sharedConfigFetcher: sharedConfigFetcher,
Metrics: metrics,
ChannelID: channelID,
}
}
// Ordered should be invoked sequentially as messages are ordered
//
// messageBatches length: 0, pending: false
// - impossible, as we have just received a message
// messageBatches length: 0, pending: true
// - no batch is cut and there are messages pending
// messageBatches length: 1, pending: false
// - the message count reaches BatchSize.MaxMessageCount
// messageBatches length: 1, pending: true
// - the current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// messageBatches length: 2, pending: false
// - the current message size in bytes exceeds BatchSize.PreferredMaxBytes, therefore isolated in its own batch.
// messageBatches length: 2, pending: true
// - impossible
//
// Note that messageBatches can not be greater than 2.
func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool) {
if len(r.pendingBatch) == 0 {
// We are beginning a new batch, mark the time
r.PendingBatchStartTime = time.Now()
}
ordererConfig, ok := r.sharedConfigFetcher.OrdererConfig()
if !ok {
logger.Panicf("Could not retrieve orderer config to query batch parameters, block cutting is not possible")
}
batchSize := ordererConfig.BatchSize()
messageSizeBytes := messageSizeBytes(msg)
if messageSizeBytes > batchSize.PreferredMaxBytes {
logger.Debugf("The current message, with %v bytes, is larger than the preferred batch size of %v bytes and will be isolated.", messageSizeBytes, batchSize.PreferredMaxBytes)
// cut pending batch, if it has any messages
if len(r.pendingBatch) > 0 {
messageBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
}
// create new batch with single message
messageBatches = append(messageBatches, []*cb.Envelope{msg})
// Record that this batch took no time to fill
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(0)
return
}
messageWillOverflowBatchSizeBytes := r.pendingBatchSizeBytes+messageSizeBytes > batchSize.PreferredMaxBytes
if messageWillOverflowBatchSizeBytes {
logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
messageBatch := r.Cut()
r.PendingBatchStartTime = time.Now()
messageBatches = append(messageBatches, messageBatch)
}
logger.Debugf("Enqueuing message into batch")
r.pendingBatch = append(r.pendingBatch, msg)
r.pendingBatchSizeBytes += messageSizeBytes
pending = true
if uint32(len(r.pendingBatch)) >= batchSize.MaxMessageCount {
logger.Debugf("Batch size met, cutting batch")
messageBatch := r.Cut()
messageBatches = append(messageBatches, messageBatch)
pending = false
}
return
}
// Cut returns the current batch and starts a new one
func (r *receiver) Cut() []*cb.Envelope {
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(time.Since(r.PendingBatchStartTime).Seconds())
r.PendingBatchStartTime = time.Time{}
batch := r.pendingBatch
r.pendingBatch = nil
r.pendingBatchSizeBytes = 0
return batch
}
func messageSizeBytes(message *cb.Envelope) uint32 {
return uint32(len(message.Payload) + len(message.Signature))
}