/
nats_requester.go
74 lines (66 loc) · 1.65 KB
/
nats_requester.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package requester
import (
"strconv"
"time"
"github.com/nats-io/go-nats"
"github.com/tylertreat/bench"
)
// NATSRequesterFactory implements RequesterFactory by creating a Requester
// which publishes messages to NATS and waits to receive them.
type NATSRequesterFactory struct {
URL string
PayloadSize int
Subject string
}
// GetRequester returns a new Requester, called for each Benchmark connection.
func (n *NATSRequesterFactory) GetRequester(num uint64) bench.Requester {
return &natsRequester{
url: n.URL,
payloadSize: n.PayloadSize,
subject: n.Subject + "-" + strconv.FormatUint(num, 10),
}
}
// natsRequester implements Requester by publishing a message to NATS and
// waiting to receive it.
type natsRequester struct {
url string
payloadSize int
subject string
conn *nats.Conn
sub *nats.Subscription
msg []byte
}
// Setup prepares the Requester for benchmarking.
func (n *natsRequester) Setup() error {
conn, err := nats.Connect(n.url)
if err != nil {
return err
}
sub, err := conn.SubscribeSync(n.subject)
if err != nil {
conn.Close()
return err
}
n.conn = conn
n.sub = sub
n.msg = make([]byte, n.payloadSize)
return nil
}
// Request performs a synchronous request to the system under test.
func (n *natsRequester) Request() error {
if err := n.conn.Publish(n.subject, n.msg); err != nil {
return err
}
_, err := n.sub.NextMsg(30 * time.Second)
return err
}
// Teardown is called upon benchmark completion.
func (n *natsRequester) Teardown() error {
if err := n.sub.Unsubscribe(); err != nil {
return err
}
n.sub = nil
n.conn.Close()
n.conn = nil
return nil
}