-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client_pool.go
150 lines (128 loc) · 3.22 KB
/
client_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package goe2ee
import (
"sync"
"github.com/rafaeljusto/goe2ee/protocol"
)
// ClientPool is a pool of clients. It is useful when you need to connect to the
// same host multiple times.
type ClientPool struct {
hostport string
poolMutex sync.Mutex
clientOptions []func(*ClientOptions)
idleClients []*Client
numberOfClients int64
maxIdleClients int64
maxOpenClients int64
waitingClients chan *Client
secret []byte
id [16]byte
}
// NewClientPool creates a new client pool.
func NewClientPool(
hostport string,
maxIdleClients int64,
maxOpenClients int64,
optFunc ...func(*ClientOptions),
) ClientPool {
return ClientPool{
hostport: hostport,
clientOptions: optFunc,
waitingClients: make(chan *Client),
maxIdleClients: maxIdleClients,
maxOpenClients: maxOpenClients,
}
}
// Get retrieves a client from the pool. If there is no idle client available, a
// new one will be created. If the maximum number of clients is reached, it will
// wait until a client is available.
func (cp *ClientPool) Get() (*Client, error) {
cp.poolMutex.Lock()
for len(cp.idleClients) > 0 {
client := cp.idleClients[0]
cp.idleClients = cp.idleClients[1:]
cp.poolMutex.Unlock()
if cp.testOnBorrow(client) {
return client, nil
}
_ = client.Close() // ignoring error as the connected is already busted
cp.poolMutex.Lock()
cp.numberOfClients--
}
if cp.numberOfClients >= cp.maxOpenClients {
cp.poolMutex.Unlock()
client := <-cp.waitingClients
if cp.testOnBorrow(client) {
return client, nil
}
_ = client.Close() // ignoring error as the connected is already busted
cp.poolMutex.Lock()
cp.numberOfClients--
}
var client *Client
var err error
if cp.secret == nil {
client, err = DialTCP(cp.hostport, cp.clientOptions...)
if err != nil {
cp.poolMutex.Unlock()
return nil, err
}
client.pool = cp
cp.secret = client.secret
cp.id = client.id
} else {
client, err = dialTCP(cp.hostport, cp.clientOptions...)
if err != nil {
cp.poolMutex.Unlock()
return nil, err
}
client.pool = cp
client.secret = cp.secret
client.id = cp.id
}
cp.numberOfClients++
cp.poolMutex.Unlock()
return client, nil
}
func (cp *ClientPool) put(client *Client) error {
cp.poolMutex.Lock()
defer cp.poolMutex.Unlock()
select {
case cp.waitingClients <- client:
return nil
default:
}
if len(cp.idleClients) >= int(cp.maxIdleClients) {
cp.numberOfClients--
return client.close()
}
cp.idleClients = append(cp.idleClients, client)
return nil
}
// Close closes all the clients in the pool.
func (cp *ClientPool) Close() error {
cp.poolMutex.Lock()
defer cp.poolMutex.Unlock()
for _, client := range cp.idleClients {
if err := client.close(); err != nil {
return err
}
}
cp.idleClients = nil
cp.numberOfClients = 0
cp.secret = nil
cp.id = [16]byte{}
close(cp.waitingClients)
return nil
}
func (cp *ClientPool) testOnBorrow(client *Client) bool {
helloRequest := protocol.NewHelloRequest()
n, err := client.conn.Write(helloRequest.Bytes())
if err != nil || n != len(helloRequest.Bytes()) {
return false
}
commonResponse, err := protocol.ParseResponseCommon(client.conn)
if err != nil || !commonResponse.Success() {
return false
}
return true
}