-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
pool.go
231 lines (194 loc) · 6.07 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package wsrpc
import (
"context"
"errors"
"sync"
"github.com/smartcontractkit/wsrpc/credentials"
"golang.org/x/exp/maps"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/csakey"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
var _ Client = &clientCheckout{}
type clientCheckout struct {
*connection // inherit all methods from client, with override on Start/Close
}
func (cco *clientCheckout) Start(_ context.Context) error {
return nil
}
func (cco *clientCheckout) Close() error {
cco.connection.checkin(cco)
return nil
}
type connection struct {
// Client will be nil when checkouts is empty, if len(checkouts) > 0 then it is expected to be a non-nil, started client
Client
lggr logger.Logger
clientPrivKey csakey.KeyV2
serverPubKey []byte
serverURL string
pool *pool
checkouts []*clientCheckout // reference count, if this goes to zero the connection should be closed and *client nilified
mu sync.Mutex
}
func (conn *connection) checkout(ctx context.Context) (cco *clientCheckout, err error) {
conn.mu.Lock()
defer conn.mu.Unlock()
if err = conn.ensureStartedClient(ctx); err != nil {
return nil, err
}
cco = &clientCheckout{conn}
conn.checkouts = append(conn.checkouts, cco)
return cco, nil
}
// not thread-safe, access must be serialized
func (conn *connection) ensureStartedClient(ctx context.Context) error {
if len(conn.checkouts) == 0 {
conn.Client = conn.pool.newClient(conn.lggr, conn.clientPrivKey, conn.serverPubKey, conn.serverURL, conn.pool.cacheSet)
return conn.Client.Start(ctx)
}
return nil
}
func (conn *connection) checkin(checkinCco *clientCheckout) {
conn.mu.Lock()
defer conn.mu.Unlock()
var removed bool
for i, cco := range conn.checkouts {
if cco == checkinCco {
conn.checkouts = utils.DeleteUnstable(conn.checkouts, i)
removed = true
break
}
}
if !removed {
panic("tried to check in client that was never checked out")
}
if len(conn.checkouts) == 0 {
if err := conn.Client.Close(); err != nil {
// programming error if we hit this
panic(err)
}
conn.Client = nil
conn.pool.remove(conn.serverURL, conn.clientPrivKey.StaticSizedPublicKey())
}
}
func (conn *connection) forceCloseAll() (err error) {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.Client != nil {
err = conn.Client.Close()
if errors.Is(err, utils.ErrAlreadyStopped) {
// ignore error if it has already been stopped; no problem
err = nil
}
conn.Client = nil
conn.checkouts = nil
}
return
}
type Pool interface {
services.ServiceCtx
// Checkout gets a wsrpc.Client for the given arguments
// The same underlying client can be checked out multiple times, the pool
// handles lifecycle management. The consumer can treat it as if it were
// its own unique client.
Checkout(ctx context.Context, clientPrivKey csakey.KeyV2, serverPubKey []byte, serverURL string) (client Client, err error)
}
// WSRPC allows only one connection per client key per server
type pool struct {
lggr logger.Logger
// server url => client public key => connection
connections map[string]map[credentials.StaticSizedPublicKey]*connection
// embedding newClient makes testing/mocking easier
newClient func(lggr logger.Logger, privKey csakey.KeyV2, serverPubKey []byte, serverURL string, cacheSet cache.CacheSet) Client
mu sync.RWMutex
cacheSet cache.CacheSet
closed bool
}
func NewPool(lggr logger.Logger, cacheCfg cache.Config) Pool {
lggr = lggr.Named("Mercury.WSRPCPool")
p := newPool(lggr)
p.newClient = NewClient
p.cacheSet = cache.NewCacheSet(lggr, cacheCfg)
return p
}
func newPool(lggr logger.Logger) *pool {
return &pool{
lggr: lggr,
connections: make(map[string]map[credentials.StaticSizedPublicKey]*connection),
}
}
func (p *pool) Checkout(ctx context.Context, clientPrivKey csakey.KeyV2, serverPubKey []byte, serverURL string) (client Client, err error) {
clientPubKey := clientPrivKey.StaticSizedPublicKey()
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil, errors.New("pool is closed")
}
server, exists := p.connections[serverURL]
if !exists {
server = make(map[credentials.StaticSizedPublicKey]*connection)
p.connections[serverURL] = server
}
conn, exists := server[clientPubKey]
if !exists {
conn = p.newConnection(p.lggr, clientPrivKey, serverPubKey, serverURL)
server[clientPubKey] = conn
}
p.mu.Unlock()
// checkout outside of pool lock since it might take non-trivial time
// the clientCheckout will be checked in again when its Close() method is called
// this also should avoid deadlocks between conn.mu and pool.mu
return conn.checkout(ctx)
}
// remove performs garbage collection on the connections map after connections are no longer used
func (p *pool) remove(serverURL string, clientPubKey credentials.StaticSizedPublicKey) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.connections[serverURL], clientPubKey)
if len(p.connections[serverURL]) == 0 {
delete(p.connections, serverURL)
}
}
func (p *pool) newConnection(lggr logger.Logger, clientPrivKey csakey.KeyV2, serverPubKey []byte, serverURL string) *connection {
return &connection{
lggr: lggr,
clientPrivKey: clientPrivKey,
serverPubKey: serverPubKey,
serverURL: serverURL,
pool: p,
}
}
func (p *pool) Start(ctx context.Context) error {
return p.cacheSet.Start(ctx)
}
func (p *pool) Close() (merr error) {
p.mu.Lock()
defer p.mu.Unlock()
p.closed = true
for _, clientPubKeys := range p.connections {
for _, conn := range clientPubKeys {
merr = errors.Join(merr, conn.forceCloseAll())
}
}
merr = errors.Join(merr, p.cacheSet.Close())
return
}
func (p *pool) Name() string {
return p.lggr.Name()
}
func (p *pool) Ready() error {
p.mu.RLock()
defer p.mu.RUnlock()
if p.closed {
return errors.New("pool is closed")
}
return nil
}
func (p *pool) HealthReport() map[string]error {
hp := map[string]error{p.Name(): p.Ready()}
maps.Copy(hp, p.cacheSet.HealthReport())
return hp
}