forked from tylertreat/bench
/
redis_requester.go
146 lines (132 loc) · 3.52 KB
/
redis_requester.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package requester
import (
"strconv"
"github.com/garyburd/redigo/redis"
"github.com/kishansairam9/bench/v2"
)
// RedisRequesterFactory implements RequesterFactory by creating a Requester
// which sends the configured command and arguments to Redis and waits for the
// reply.
type RedisRequesterFactory struct {
URL string
Command string
Args []interface{}
}
// GetRequester returns a new Requester, called for each Benchmark connection.
func (r *RedisRequesterFactory) GetRequester(uint64) bench.Requester {
return &redisRequester{
url: r.URL,
command: r.Command,
args: r.Args,
}
}
// redisRequester implements Requester by sending the configured command and
// arguments to Redis and waiting for the reply.
type redisRequester struct {
url string
command string
args []interface{}
conn redis.Conn
}
// Setup prepares the Requester for benchmarking.
func (r *redisRequester) Setup() error {
conn, err := redis.Dial("tcp", r.url)
if err != nil {
return err
}
r.conn = conn
return nil
}
// Request performs a synchronous request to the system under test.
func (r *redisRequester) Request() error {
_, err := r.conn.Do(r.command, r.args...)
return err
}
// Teardown is called upon benchmark completion.
func (r *redisRequester) Teardown() error {
if err := r.conn.Close(); err != nil {
return err
}
r.conn = nil
return nil
}
// RedisPubSubRequesterFactory implements RequesterFactory by creating a
// Requester which publishes messages to Redis and waits to receive them.
type RedisPubSubRequesterFactory struct {
URL string
PayloadSize int
Channel string
}
// redisPubSubRequester implements Requester by publishing a message to Redis
// and waiting to receive it.
type redisPubSubRequester struct {
url string
payloadSize int
channel string
publishConn redis.Conn
subscribeConn *redis.PubSubConn
msg string
}
// GetRequester returns a new Requester, called for each Benchmark connection.
func (r *RedisPubSubRequesterFactory) GetRequester(num uint64) bench.Requester {
return &redisPubSubRequester{
url: r.URL,
payloadSize: r.PayloadSize,
channel: r.Channel + "-" + strconv.FormatUint(num, 10),
}
}
// Setup prepares the Requester for benchmarking.
func (r *redisPubSubRequester) Setup() error {
pubConn, err := redis.Dial("tcp", r.url)
if err != nil {
return err
}
subConn, err := redis.Dial("tcp", r.url)
if err != nil {
return err
}
subscribeConn := &redis.PubSubConn{subConn}
if err := subscribeConn.Subscribe(r.channel); err != nil {
subscribeConn.Close()
return err
}
// Receive subscription message.
switch recv := subscribeConn.Receive().(type) {
case error:
return recv
}
r.publishConn = pubConn
r.subscribeConn = subscribeConn
r.msg = string(make([]byte, r.payloadSize))
return nil
}
// Request performs a synchronous request to the system under test.
func (r *redisPubSubRequester) Request() error {
if err := r.publishConn.Send("PUBLISH", r.channel, r.msg); err != nil {
return err
}
if err := r.publishConn.Flush(); err != nil {
return err
}
switch recv := r.subscribeConn.Receive().(type) {
case error:
return recv
default:
return nil
}
}
// Teardown is called upon benchmark completion.
func (r *redisPubSubRequester) Teardown() error {
if err := r.publishConn.Close(); err != nil {
return err
}
r.publishConn = nil
if err := r.subscribeConn.Unsubscribe(r.channel); err != nil {
return err
}
if err := r.subscribeConn.Close(); err != nil {
return err
}
r.subscribeConn = nil
return nil
}