forked from Hyperledger-TWGC/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
block_gen.go
100 lines (85 loc) · 2.36 KB
/
block_gen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package chainmgmt
import (
"sync"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protoutil"
)
const (
numConcurrentTxEnvCreators = 30
)
type txEnvBytes []byte
// blkGenerator generates blocks in sequence. One instance of blkGenerator is maintained for each chain
type blkGenerator struct {
batchConf *BatchConf
blockNum uint64
previousBlockHash []byte
srQueue chan SimulationResult
txQueue chan txEnvBytes
wg *sync.WaitGroup
}
func newBlkGenerator(batchConf *BatchConf, startingBlockNum uint64, previousBlockHash []byte) *blkGenerator {
bg := &blkGenerator{
batchConf,
startingBlockNum,
previousBlockHash,
make(chan SimulationResult, batchConf.BatchSize),
make(chan txEnvBytes, batchConf.BatchSize),
&sync.WaitGroup{},
}
bg.startTxEnvCreators()
return bg
}
func (bg *blkGenerator) startTxEnvCreators() {
for i := 0; i < numConcurrentTxEnvCreators; i++ {
go bg.startTxEnvCreator()
}
}
func (bg *blkGenerator) startTxEnvCreator() {
bg.wg.Add(1)
for sr := range bg.srQueue {
txEnv, err := createTxEnv(sr)
panicOnError(err)
txEnvBytes, err := proto.Marshal(txEnv)
panicOnError(err)
bg.txQueue <- txEnvBytes
}
bg.wg.Done()
}
func (bg *blkGenerator) addTx(sr SimulationResult) {
bg.srQueue <- sr
}
func (bg *blkGenerator) nextBlock() *common.Block {
block := protoutil.NewBlock(bg.blockNum, bg.previousBlockHash)
numTx := 0
for txEnvBytes := range bg.txQueue {
numTx++
block.Data.Data = append(block.Data.Data, txEnvBytes)
if numTx == bg.batchConf.BatchSize {
break
}
}
// close() has been called and no pending tx
if len(block.Data.Data) == 0 {
return nil
}
block.Header.DataHash = protoutil.BlockDataHash(block.Data)
block.Header.Number = bg.blockNum
block.Header.PreviousHash = bg.previousBlockHash
txsfltr := util.NewTxValidationFlagsSetValue(len(block.Data.Data), peer.TxValidationCode_VALID)
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsfltr
bg.blockNum++
bg.previousBlockHash = protoutil.BlockHeaderHash(block.Header)
return block
}
func (bg *blkGenerator) close() {
close(bg.srQueue)
bg.wg.Wait()
close(bg.txQueue)
}