forked from smallnest/rpcx
/
xclient_pool.go
94 lines (82 loc) ยท 2.56 KB
/
xclient_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package client
import (
"sync"
"sync/atomic"
"github.com/ssdev-go/rpcx/protocol"
)
// XClientPool is a xclient pool with fixed size.
// It uses roundrobin algorithm to call its xclients.
// All xclients share the same configurations such as ServiceDiscovery and serverMessageChan.
type XClientPool struct {
count uint64
index uint64
xclients []XClient
mu sync.RWMutex
servicePath string
failMode FailMode
selectMode SelectMode
discovery ServiceDiscovery
option Option
auth string
serverMessageChan chan<- *protocol.Message
}
// NewXClientPool creates a fixed size XClient pool.
func NewXClientPool(count int, servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) *XClientPool {
pool := &XClientPool{
count: uint64(count),
xclients: make([]XClient, count),
servicePath: servicePath,
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
option: option,
}
for i := 0; i < count; i++ {
xclient := NewXClient(servicePath, failMode, selectMode, discovery, option)
pool.xclients[i] = xclient
}
return pool
}
// NewBidirectionalXClientPool creates a BidirectionalXClient pool with fixed size.
func NewBidirectionalXClientPool(count int, servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option, serverMessageChan chan<- *protocol.Message) *XClientPool {
pool := &XClientPool{
count: uint64(count),
xclients: make([]XClient, count),
servicePath: servicePath,
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
option: option,
serverMessageChan: serverMessageChan,
}
for i := 0; i < count; i++ {
xclient := NewBidirectionalXClient(servicePath, failMode, selectMode, discovery, option, serverMessageChan)
pool.xclients[i] = xclient
}
return pool
}
// Auth sets s token for Authentication.
func (c *XClientPool) Auth(auth string) {
c.auth = auth
c.mu.RLock()
for _, v := range c.xclients {
v.Auth(auth)
}
c.mu.RUnlock()
}
// Get returns a xclient.
// It does not remove this xclient from its cache so you don't need to put it back.
// Don't close this xclient because maybe other goroutines are using this xclient.
func (p *XClientPool) Get() XClient {
i := atomic.AddUint64(&p.index, 1)
picked := int(i % p.count)
return p.xclients[picked]
}
// Close this pool.
// Please make sure it won't be used any more.
func (p *XClientPool) Close() {
for _, c := range p.xclients {
c.Close()
}
p.xclients = nil
}