forked from mediocregopher/radix.v2
/
pool.go
128 lines (114 loc) · 3.46 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package pool
import (
"github.com/mediocregopher/radix.v2/redis"
)
// Pool is a simple connection pool for redis Clients. It will create a small
// pool of initial connections, and if more connections are needed they will be
// created on demand. If a connection is Put back and the pool is full it will
// be closed.
type Pool struct {
pool chan *redis.Client
df DialFunc
// The network/address that the pool is connecting to. These are going to be
// whatever was passed into the New function. These should not be
// changed after the pool is initialized
Network, Addr string
}
// DialFunc is a function which can be passed into NewCustom
type DialFunc func(network, addr string) (*redis.Client, error)
// NewCustom is like New except you can specify a DialFunc which will be
// used when creating new connections for the pool. The common use-case is to do
// authentication for new connections.
func NewCustom(network, addr string, size int, df DialFunc) (*Pool, error) {
var client *redis.Client
var err error
pool := make([]*redis.Client, 0, size)
for i := 0; i < size; i++ {
client, err = df(network, addr)
if err != nil {
for _, client = range pool {
client.Close()
}
pool = pool[0:]
break
}
pool = append(pool, client)
}
p := Pool{
Network: network,
Addr: addr,
pool: make(chan *redis.Client, len(pool)),
df: df,
}
for i := range pool {
p.pool <- pool[i]
}
return &p, err
}
// New creates a new Pool whose connections are all created using
// redis.Dial(network, addr). The size indicates the maximum number of idle
// connections to have waiting to be used at any given moment. If an error is
// encountered an empty (but still usable) pool is returned alongside that error
func New(network, addr string, size int) (*Pool, error) {
return NewCustom(network, addr, size, redis.Dial)
}
// Get retrieves an available redis client. If there are none available it will
// create a new one on the fly
func (p *Pool) Get() (*redis.Client, error) {
select {
case conn := <-p.pool:
return conn, nil
default:
return p.df(p.Network, p.Addr)
}
}
// Put returns a client back to the pool. If the pool is full the client is
// closed instead. If the client is already closed (due to connection failure or
// what-have-you) it will not be put back in the pool
func (p *Pool) Put(conn *redis.Client) {
if conn.LastCritical == nil {
select {
case p.pool <- conn:
default:
conn.Close()
}
}
}
// Cmd automatically gets one client from the pool, executes the given command
// (returning its result), and puts the client back in the pool
func (p *Pool) Cmd(cmd string, args ...interface{}) *redis.Resp {
c, err := p.Get()
if err != nil {
return redis.NewResp(err)
}
var r = c.Cmd(cmd, args...)
if r.IsType(redis.IOErr) {
c, err = p.df(p.Network, p.Addr)
if err != nil {
return redis.NewResp(err)
}
r = c.Cmd(cmd, args...)
}
p.Put(c)
return r
}
// Empty removes and calls Close() on all the connections currently in the pool.
// Assuming there are no other connections waiting to be Put back this method
// effectively closes and cleans up the pool.
func (p *Pool) Empty() {
var conn *redis.Client
for {
select {
case conn = <-p.pool:
conn.Close()
default:
return
}
}
}
// Avail returns the number of connections currently available to be gotten from
// the Pool using Get. If the number is zero then subsequent calls to Get will
// be creating new connections on the fly
func (p *Pool) Avail() int {
return len(p.pool)
}