-
Notifications
You must be signed in to change notification settings - Fork 0
/
conn.go
148 lines (130 loc) · 3.4 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package deks
import (
"net"
"github.com/mediocregopher/radix.v2/redis"
"github.com/simia-tech/errx"
)
// Conn implements a client connection based on the redis protocol.
type Conn struct {
conn net.Conn
client *redis.Client
}
// Dial establishes a connection to the server at the provided url.
func Dial(url string) (*Conn, error) {
network, address, err := parseURL(url)
if err != nil {
return nil, errx.Annotatef(err, "parse url [%s]", url)
}
conn, err := net.Dial(network, address)
if err != nil {
return nil, errx.Annotatef(err, "dial [%s %s]", network, address)
}
client, err := redis.NewClient(conn)
if err != nil {
return nil, errx.Annotatef(err, "new client")
}
return &Conn{
conn: conn,
client: client,
}, nil
}
// Close tears down the connection.
func (c *Conn) Close() error {
return c.conn.Close()
}
// Ping sends a ping to the server and fails if the connection is broken.
func (c *Conn) Ping() error {
response := c.client.Cmd(cmdPing)
if !isOK(response) {
return errx.Errorf("ping command failed")
}
return nil
}
// Set sets the provided value at the provided key.
func (c *Conn) Set(key, value []byte) error {
response := c.client.Cmd(cmdSet, key, value)
if !isOK(response) {
return errx.Errorf("set command failed")
}
return nil
}
// Get returns the value at the provided key.
func (c *Conn) Get(key []byte) ([]byte, error) {
response := c.client.Cmd(cmdGet, key)
if !response.IsType(redis.Str) {
return nil, errx.Errorf("get item command failed")
}
bytes, err := response.Bytes()
if err != nil {
return nil, errx.Annotatef(err, "response bytes")
}
return bytes, nil
}
// Delete removes the value at the provided key.
func (c *Conn) Delete(key []byte) error {
response := c.client.Cmd(cmdDelete, key)
if !isOK(response) {
return errx.Errorf("delete command failed")
}
return nil
}
// Keys returns a slice containing all keys.
func (c *Conn) Keys() ([][]byte, error) {
response := c.client.Cmd(cmdKeys)
items, err := response.Array()
if err != nil {
return nil, errx.Annotatef(err, "response array")
}
keys := make([][]byte, len(items))
for index, item := range items {
key, err := item.Bytes()
if err != nil {
return nil, errx.Annotatef(err, "response bytes")
}
keys[index] = key
}
return keys, nil
}
// Tidy cleans up the store.
func (c *Conn) Tidy() error {
response := c.client.Cmd(cmdTidy)
if !isOK(response) {
return errx.Errorf("tidy command failed")
}
return nil
}
// Reconsilate sets the server into reconsilation mode and returns the underlying connection.
func (c *Conn) Reconsilate() (net.Conn, error) {
response := c.client.Cmd(cmdReconcilate)
if !isOK(response) {
return nil, errx.Errorf("reconsilate command failed")
}
c.client = nil
return c.conn, nil
}
func (c *Conn) setContainer(kh keyHash, item []byte) error {
response := c.client.Cmd(cmdSetContainer, kh[:], item)
if !isOK(response) {
return errx.Errorf("set container command failed")
}
return nil
}
func (c *Conn) getContainer(kh keyHash) ([]byte, error) {
response := c.client.Cmd(cmdGetContainer, kh[:])
if !response.IsType(redis.Str) {
return nil, errx.Errorf("get container command failed")
}
bytes, err := response.Bytes()
if err != nil {
return nil, errx.Annotatef(err, "response bytes")
}
return bytes, nil
}
func isOK(response *redis.Resp) bool {
if response.IsType(redis.Str) {
if s, _ := response.Str(); s == "OK" {
return true
}
}
return false
}