/
observer.go
107 lines (89 loc) · 2.87 KB
/
observer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package observer
import (
"context"
"github.com/wsw365904/wswlog/wlogging"
"time"
"github.com/wsw365904/tape/pkg/infra"
"github.com/wsw365904/tape/pkg/infra/basic"
"github.com/wsw365904/tape/pkg/infra/trafficGenerator"
"github.com/hyperledger/fabric-protos-go/peer"
"github.com/pkg/errors"
)
type Observers struct {
workers []*Observer
errorCh chan error
blockCh chan *AddressedBlock
ctx context.Context
}
type Observer struct {
index int
Address string
d peer.Deliver_DeliverFilteredClient
logger *wlogging.WswLogger
}
func CreateObservers(ctx context.Context, crypto infra.Crypto, errorCh chan error, blockCh chan *AddressedBlock, config basic.Config, logger *wlogging.WswLogger) (*Observers, error) {
var workers []*Observer
for i, node := range config.Committers {
worker, err := CreateObserver(ctx, config.Channel, node, crypto, logger)
if err != nil {
return nil, err
}
worker.index = i
workers = append(workers, worker)
}
return &Observers{
workers: workers,
errorCh: errorCh,
blockCh: blockCh,
ctx: ctx,
}, nil
}
func (o *Observers) Start() {
//o.StartTime = time.Now()
o.ctx = context.WithValue(o.ctx, "start", time.Now())
for i := 0; i < len(o.workers); i++ {
go o.workers[i].Start(o.errorCh, o.blockCh, o.ctx.Value("start").(time.Time))
}
}
func (o *Observers) GetTime() time.Time {
return o.ctx.Value("start").(time.Time)
}
func CreateObserver(ctx context.Context, channel string, node basic.Node, crypto infra.Crypto, logger *wlogging.WswLogger) (*Observer, error) {
seek, err := trafficGenerator.CreateSignedDeliverNewestEnv(channel, crypto)
if err != nil {
return nil, err
}
deliverer, err := basic.CreateDeliverFilteredClient(ctx, node, logger)
if err != nil {
return nil, err
}
if err = deliverer.Send(seek); err != nil {
return nil, err
}
// drain first response
if _, err = deliverer.Recv(); err != nil {
return nil, err
}
return &Observer{Address: node.Addr, d: deliverer, logger: logger}, nil
}
func (o *Observer) Start(errorCh chan error, blockCh chan<- *AddressedBlock, now time.Time) {
o.logger.Debugf("start observer for peer %s", o.Address)
for {
r, err := o.d.Recv()
if err != nil {
errorCh <- err
}
if r == nil {
errorCh <- errors.Errorf("received nil message, but expect a valid block instead. You could look into your peer logs for more info")
return
}
fb := r.Type.(*peer.DeliverResponse_FilteredBlock)
for _, b := range fb.FilteredBlock.FilteredTransactions {
basic.LogEvent(o.logger, b.Txid, "CommitAtPeer")
tapeSpan := basic.GetGlobalSpan()
tapeSpan.FinishWithMap(b.Txid, o.Address, basic.COMMIT_AT_PEER)
}
o.logger.Debugf("receivedTime %8.2fs\tBlock %6d\tTx %6d\t Address %s\n", time.Since(now).Seconds(), fb.FilteredBlock.Number, len(fb.FilteredBlock.FilteredTransactions), o.Address)
blockCh <- &AddressedBlock{fb.FilteredBlock, o.index, time.Since(now)}
}
}