forked from hyperledger/fabric-sdk-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mockproducer.go
executable file
·145 lines (120 loc) · 2.95 KB
/
mockproducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
Copyright SecureKey Technologies Inc. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package mocks
import (
"fmt"
"sync"
"sync/atomic"
"time"
pb "github.com/hyperledger/fabric-sdk-go/third_party/github.com/hyperledger/fabric/protos/peer"
)
// Ledger is a MockLedger
type Ledger interface {
// Register registers an event consumer
Register(consumer Consumer)
// Unregister unregisters the given consumer
Unregister(consumer Consumer)
// NewBlock returns a new block
NewBlock(channelID string, transactions ...*TxInfo)
// NewFilteredBlock returns a new filtered block
NewFilteredBlock(channelID string, filteredTx ...*pb.FilteredTransaction)
// SendFrom sends block events to all registered consumers from the
// given block number
SendFrom(blockNum uint64)
}
// MockProducer produces events for unit testing
type MockProducer struct {
sync.RWMutex
rcvch chan interface{}
eventChannels []chan interface{}
ledger Ledger
closed int32
}
// NewMockProducer returns a new MockProducer
func NewMockProducer(ledger Ledger) *MockProducer {
c := &MockProducer{
rcvch: make(chan interface{}, 100),
ledger: ledger,
}
go c.listen()
ledger.Register(c.rcvch)
return c
}
// Close closes the event producer
func (c *MockProducer) Close() {
if !atomic.CompareAndSwapInt32(&c.closed, 0, 1) {
// Already closed
return
}
c.ledger.Unregister(c.rcvch)
close(c.rcvch)
}
// Register registers an event channel
func (c *MockProducer) Register() <-chan interface{} {
c.Lock()
defer c.Unlock()
eventch := make(chan interface{})
c.eventChannels = append(c.eventChannels, eventch)
return eventch
}
// Unregister unregisters an event channel
func (c *MockProducer) Unregister(eventch chan<- interface{}) {
c.Lock()
defer c.Unlock()
for i, e := range c.eventChannels {
if e == eventch {
if i != 0 {
c.eventChannels = c.eventChannels[1:]
}
c.eventChannels = c.eventChannels[1:]
close(eventch)
return
}
}
}
// Ledger returns the mock ledger
func (c *MockProducer) Ledger() Ledger {
return c.ledger
}
func (c *MockProducer) listen() {
for {
event, ok := <-c.rcvch
if !ok {
// Channel is closed
c.unregisterAll()
return
}
c.notifyAll(event)
}
}
func (c *MockProducer) notifyAll(event interface{}) {
c.RLock()
defer c.RUnlock()
for _, eventch := range c.eventChannels {
send(eventch, event)
}
}
func (c *MockProducer) unregisterAll() {
c.Lock()
defer c.Unlock()
for _, eventch := range c.eventChannels {
close(eventch)
}
c.eventChannels = nil
}
func send(eventch chan<- interface{}, event interface{}) {
defer func() {
// During shutdown, events may still be produced and we may
// get a 'send on closed channel' panic. Just log and ignore the error.
if p := recover(); p != nil {
fmt.Printf("panic while submitting event %#v: %s\n", event, p)
}
}()
select {
case eventch <- event:
case <-time.After(5 * time.Second):
fmt.Printf("***** Timed out sending event.\n")
}
}