-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
226 lines (188 loc) · 5.75 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package performance
import (
"io"
"sync"
"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)
const pkgLogID = "orderer/common/performance"
var logger *logging.Logger
func init() {
logger = flogging.MustGetLogger(pkgLogID)
}
// BenchmarkServer is a pseudo-server that grpc services could be registered to
type BenchmarkServer struct {
server ab.AtomicBroadcastServer
start chan struct{}
halt chan struct{}
}
var (
servers []*BenchmarkServer
index int
mutex sync.Mutex
)
// InitializeServerPool instantiates a Benchmark server pool of size 'number'
func InitializeServerPool(number int) {
mutex = sync.Mutex{}
index = 0
servers = make([]*BenchmarkServer, number)
for i := 0; i < number; i++ {
servers[i] = &BenchmarkServer{
server: nil,
start: make(chan struct{}),
halt: make(chan struct{}),
}
}
}
// GetBenchmarkServer retrieves next unused server in the pool.
// This method should ONLY be called by orderer main() and it
// should be used after initialization
func GetBenchmarkServer() *BenchmarkServer {
mutex.Lock()
defer mutex.Unlock()
if index >= len(servers) {
panic("Not enough servers in the pool!")
}
defer func() { index++ }()
return servers[index]
}
// GetBenchmarkServerPool returns the whole server pool for client to use
// This should be used after initialization
func GetBenchmarkServerPool() []*BenchmarkServer {
return servers
}
// Start blocks until server being halted. It is to prevent main process to exit
func (server *BenchmarkServer) Start() {
server.halt = make(chan struct{})
close(server.start) // signal waiters that service is registered
// Block reading here to prevent process exit
<-server.halt
}
// Halt server
func Halt(server *BenchmarkServer) { server.Halt() }
// Halt server
func (server *BenchmarkServer) Halt() {
logger.Debug("Stopping benchmark server")
server.server = nil
server.start = make(chan struct{})
close(server.halt)
}
// WaitForService blocks waiting for service to be registered
func WaitForService(server *BenchmarkServer) { server.WaitForService() }
// WaitForService blocks waiting for service to be registered
func (server *BenchmarkServer) WaitForService() { <-server.start }
// RegisterService registers a grpc service to server
func (server *BenchmarkServer) RegisterService(s ab.AtomicBroadcastServer) {
server.server = s
}
// CreateBroadcastClient creates a broadcast client of this server
func (server *BenchmarkServer) CreateBroadcastClient() *BroadcastClient {
client := &BroadcastClient{
requestChan: make(chan *cb.Envelope),
responseChan: make(chan *ab.BroadcastResponse),
errChan: make(chan error),
}
go func() {
client.errChan <- server.server.Broadcast(client)
}()
return client
}
// BroadcastClient represents a broadcast client that is used to interact
// with `broadcast` API
type BroadcastClient struct {
grpc.ServerStream
requestChan chan *cb.Envelope
responseChan chan *ab.BroadcastResponse
errChan chan error
}
func (BroadcastClient) Context() context.Context {
return peer.NewContext(context.Background(), &peer.Peer{})
}
// SendRequest sends an envelope to `broadcast` API synchronously
func (bc *BroadcastClient) SendRequest(request *cb.Envelope) {
// TODO make this async
bc.requestChan <- request
}
// GetResponse waits for a response of `broadcast` API synchronously
func (bc *BroadcastClient) GetResponse() *ab.BroadcastResponse {
return <-bc.responseChan
}
// Close closes a broadcast client
func (bc *BroadcastClient) Close() {
close(bc.requestChan)
}
// Errors returns the channel which return value of broadcast handler is sent to
func (bc *BroadcastClient) Errors() <-chan error {
return bc.errChan
}
// Send implements AtomicBroadcast_BroadcastServer interface
func (bc *BroadcastClient) Send(br *ab.BroadcastResponse) error {
bc.responseChan <- br
return nil
}
// Recv implements AtomicBroadcast_BroadcastServer interface
func (bc *BroadcastClient) Recv() (*cb.Envelope, error) {
msg, ok := <-bc.requestChan
if !ok {
return msg, io.EOF
}
return msg, nil
}
// CreateDeliverClient creates a broadcast client of this server
func (server *BenchmarkServer) CreateDeliverClient() *DeliverClient {
client := &DeliverClient{
requestChan: make(chan *cb.Envelope),
ResponseChan: make(chan *ab.DeliverResponse),
ResultChan: make(chan error),
}
go func() {
client.ResultChan <- server.server.Deliver(client)
}()
return client
}
// DeliverClient represents a deliver client that is used to interact
// with `deliver` API
type DeliverClient struct {
grpc.ServerStream
requestChan chan *cb.Envelope
ResponseChan chan *ab.DeliverResponse
ResultChan chan error
}
func (DeliverClient) Context() context.Context {
return peer.NewContext(context.Background(), &peer.Peer{})
}
// SendRequest sends an envelope to `deliver` API synchronously
func (bc *DeliverClient) SendRequest(request *cb.Envelope) {
// TODO make this async
bc.requestChan <- request
}
// GetResponse waits for a response of `deliver` API synchronously
func (bc *DeliverClient) GetResponse() *ab.DeliverResponse {
return <-bc.ResponseChan
}
// Close closes a deliver client
func (bc *DeliverClient) Close() {
close(bc.requestChan)
}
// Send implements AtomicBroadcast_BroadcastServer interface
func (bc *DeliverClient) Send(br *ab.DeliverResponse) error {
bc.ResponseChan <- br
return nil
}
// Recv implements AtomicBroadcast_BroadcastServer interface
func (bc *DeliverClient) Recv() (*cb.Envelope, error) {
msg, ok := <-bc.requestChan
if !ok {
return msg, io.EOF
}
return msg, nil
}