-
Notifications
You must be signed in to change notification settings - Fork 197
/
sshclients.go
165 lines (147 loc) · 4 KB
/
sshclients.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package srvconn
import (
"time"
"github.com/jumpserver/koko/pkg/logger"
)
type UserSSHClient struct {
ID string // 这个 user ssh client key 参考 MakeReuseSSHClientKey
data map[*SSHClient]int64
name string
}
func (u *UserSSHClient) AddClient(client *SSHClient) {
u.data[client] = time.Now().UnixNano()
}
func (u *UserSSHClient) GetClient() *SSHClient {
var selectClient *SSHClient
var refCount int32
// 取引用最少的 SSHClient
for clientItem := range u.data {
if refCount <= clientItem.RefCount() {
refCount = clientItem.RefCount()
selectClient = clientItem
}
}
return selectClient
}
func (u *UserSSHClient) recycleClients() {
needRemovedClients := make([]*SSHClient, 0, len(u.data))
for client := range u.data {
if client.RefCount() <= 0 && client.selfRef() <= 0 {
needRemovedClients = append(needRemovedClients, client)
_ = client.Close()
}
}
if len(needRemovedClients) > 0 {
for i := range needRemovedClients {
delete(u.data, needRemovedClients[i])
}
logger.Infof("Remove %d clients (%s) remain %d",
len(needRemovedClients), u.name, len(u.data))
}
}
func (u *UserSSHClient) Count() int {
return len(u.data)
}
func newSSHManager() *SSHManager {
m := SSHManager{
storeChan: make(chan *storeClient),
reqChan: make(chan string),
resultChan: make(chan *SSHClient),
releaseChan: make(chan *storeClient),
}
go m.run()
return &m
}
type SSHManager struct {
storeChan chan *storeClient
reqChan chan string // reqId
resultChan chan *SSHClient
releaseChan chan *storeClient
}
func (s *SSHManager) run() {
tick := time.NewTicker(time.Minute)
defer tick.Stop()
data := make(map[string]*UserSSHClient)
latestVisited := time.Now()
for {
select {
case now := <-tick.C:
/*
1. 1 分钟无访问则 让所有的 UserSSHClient recycleClients
2. 并清理 count==0 的 UserSSHClient
*/
if now.After(latestVisited.Add(time.Minute)) {
needRemovedClients := make([]string, 0, len(data))
for key, userClient := range data {
userClient.recycleClients()
if userClient.Count() == 0 {
needRemovedClients = append(needRemovedClients, key)
}
}
if len(needRemovedClients) > 0 {
for i := range needRemovedClients {
delete(data, needRemovedClients[i])
}
logger.Infof("Remove %d cache ssh clients remain %d",
len(needRemovedClients), len(data))
}
}
continue
case reqKey := <-s.reqChan:
var foundClient *SSHClient
if userClient, ok := data[reqKey]; ok {
foundClient = userClient.GetClient()
logger.Infof("Found client(%s) and remain %d",
foundClient, userClient.Count())
}
if foundClient != nil {
foundClient.increaseSelfRef()
}
s.resultChan <- foundClient
case reqClient := <-s.storeChan:
reqClient.SSHClient.increaseSelfRef()
userClient, ok := data[reqClient.key]
if !ok {
userClient = &UserSSHClient{
ID: reqClient.key,
name: reqClient.SSHClient.String(),
data: make(map[*SSHClient]int64),
}
data[reqClient.key] = userClient
}
userClient.AddClient(reqClient.SSHClient)
logger.Infof("Store new client(%s) remain %d", reqClient.String(), userClient.Count())
case reqClient := <-s.releaseChan:
// 收到释放请求,及时释放对应的 SSHClient
reqClient.decreaseSelfRef()
if userClient, ok := data[reqClient.key]; ok {
userClient.recycleClients()
} else {
_ = reqClient.Close()
logger.Infof("SSH client(%s) not found in user ssh cache and close", reqClient.String())
}
}
latestVisited = time.Now()
}
}
func (s *SSHManager) getClientFromCache(key string) (*SSHClient, bool) {
s.reqChan <- key
client := <-s.resultChan
return client, client != nil
}
func (s *SSHManager) AddClientCache(key string, client *SSHClient) {
s.storeChan <- &storeClient{
key: key,
SSHClient: client,
}
}
func (s *SSHManager) ReleaseClientCacheKey(key string, client *SSHClient) {
s.releaseChan <- &storeClient{
key: key,
SSHClient: client,
}
}
type storeClient struct {
key string
*SSHClient
}