-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
235 lines (189 loc) · 5.17 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package main
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net/http"
"os"
"sync"
"time"
"github.com/BurntSushi/toml"
log "github.com/Sirupsen/logrus"
"github.com/docopt/docopt-go"
"github.com/mediocregopher/radix.v2/pool"
"github.com/mediocregopher/radix.v2/redis"
)
type Cluster struct {
sync.Mutex
master *pool.Pool
slaves []*pool.Pool
}
type Config struct {
Master string
Slaves []string
}
var usage = `
Reverse Redis Proxy
Usage:
rrproxy [options] --master=<master>
rrproxy [options] --config=<config>
rrproxy -h | --help | --version
Options:
-h, --help Show this screen.
--version Show version.
-p, --port=<port> Listen on [default: 8888].
-m, --master=<master> Master redis instance.
-c, --config=<config> Configuration file.
`
const (
// PoolSize is the number of redis connection to initially open
PoolSize = 10
// ForcedLatency is the amount of time we stall the redis server in order
// to simulate a loaded box
ForcedLatency = time.Duration(1 * time.Millisecond)
)
func main() {
mainEx(os.Args[1:])
}
func mainEx(argv []string) {
var err error
args, err := docopt.Parse(usage, argv, true, "1.0", false)
if err != nil {
log.Fatal(err)
}
var config Config
if args["--master"] != nil {
config.Master = args["--master"].(string)
}
if args["--config"] != nil {
if _, err := toml.DecodeFile(args["--config"].(string), &config); err != nil {
log.Fatal(err)
}
if config.Master == "" {
log.Fatalf("'Master' not specified in %v", args["--config"])
}
}
log.Warn("This is a toy server for use the Oslo Day of Docker Workshop 2015. Do not use in production!")
cluster, err := NewCluster(&config)
if err != nil {
log.Fatal(err)
}
// setup a basic ping response for testing
http.HandleFunc("/db/_ping", func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "OK")
})
// init the GET and POST handler
http.Handle("/db/", http.StripPrefix("/db/", cluster))
log.Infof("Starting server on *:%v", args["--port"])
// start up this elegant bit of late-night, øl fueled engineering
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", args["--port"]), nil))
}
func NewCluster(config *Config) (*Cluster, error) {
master, err := pool.New("tcp", fmt.Sprintf("%s:6379", config.Master), PoolSize)
if err != nil {
return nil, err
}
c := Cluster{
master: master,
slaves: make([]*pool.Pool, len(config.Slaves)),
}
for i, slave := range config.Slaves {
conn, err := pool.New("tcp", fmt.Sprintf("%s:6379", slave), PoolSize)
if err != nil {
return nil, err
}
c.slaves[i] = conn
}
log.Infof("Connected 1 master and %d slaves", len(c.slaves))
return &c, nil
}
func (c *Cluster) doGET(w http.ResponseWriter, r *http.Request) {
var err error
// path is already read because we are using http.StripPrefix
path := r.URL.Path
if path == "" {
http.NotFound(w, r)
return
}
slave := c.getReadSlave()
// this makes it easy to debug with curl -v
w.Header().Add("X-RRPROXY-SERVER", slave.Addr)
// get a redis client from the pool
client, err := slave.Get()
if err != nil {
log.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// make sure we return it
defer slave.Put(client)
var resp *redis.Resp
// check to see if the key exists
resp = client.Cmd("EXISTS", path)
v, _ := resp.Int()
if v == 0 {
http.NotFound(w, r)
return
}
// here we introduce some artifical latency. without it, and with redis
// running on the same laptop as the server, other layers become the bottle
// neck rather than redis, which kinda defeats the purpose of this lab
if ForcedLatency > 0 {
resp = client.Cmd("DEBUG", "sleep", ForcedLatency.Seconds())
if resp.Err != nil {
log.Fatal(resp.Err)
}
}
// get the raw value as bytes and write it out
resp = client.Cmd("GET", path)
data, err := resp.Bytes()
if err != nil {
log.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Write(data)
}
func (c *Cluster) doPOST(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
if path == "" {
http.Error(w, "empty path not allowed", http.StatusBadRequest)
return
}
// no streaming, just read everything into memory. yay for non-production
// code!!!
data, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = c.master.Cmd("SET", path, data).Err
if err != nil {
log.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
io.WriteString(w, fmt.Sprintf("%d", len(data)))
}
func (c *Cluster) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
c.doGET(w, r)
case "POST":
c.doPOST(w, r)
default:
http.Error(w, fmt.Sprintf("%s not supported", r.Method), http.StatusMethodNotAllowed)
}
}
func (c *Cluster) getReadSlave() *pool.Pool {
// special case for a single node system
if len(c.slaves) == 0 {
return c.master
}
// TODO: should the rand object be per goroutine? Otherwise we have a shared
// lock in the global rand
//r := rand.New(rand.NewSource(time.Now().UnixNano()))
return c.slaves[rand.Int()%len(c.slaves)]
}