-
Notifications
You must be signed in to change notification settings - Fork 474
/
client_factory.go
180 lines (152 loc) · 4.91 KB
/
client_factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package kubernetes
import (
"sync"
"time"
"k8s.io/client-go/rest"
kialiConfig "github.com/kiali/kiali/config"
"github.com/kiali/kiali/log"
"github.com/kiali/kiali/prometheus/internalmetrics"
)
var factory *clientFactory
// Mutex for when modifying the stored clients
var mutex = &sync.RWMutex{}
const expirationTime = time.Minute * 15
// ClientFactory interface for the clientFactory object
type ClientFactory interface {
GetClient(token string) (ClientInterface, error)
}
// clientFactory used to generate per users clients
type clientFactory struct {
ClientFactory
baseIstioConfig *rest.Config
clientEntries map[string]*clientEntry
}
// clientEntry stored the client and its created timestamp
type clientEntry struct {
client ClientInterface
created time.Time
}
// GetClientFactory returns the client factory. Creates a new one if necessary
func GetClientFactory() (ClientFactory, error) {
if factory == nil {
// Get the normal configuration
config, err := ConfigClient()
if err != nil {
return nil, err
}
// Create a new config based on what was gathered above but don't specify the bearer token to use
istioConfig := rest.Config{
Host: config.Host,
TLSClientConfig: config.TLSClientConfig,
QPS: config.QPS,
Burst: config.Burst,
}
return getClientFactory(&istioConfig, expirationTime)
}
return factory, nil
}
// newClientFactory allows for specifying the config and expiry duration
// Mock friendly for testing purposes
func getClientFactory(istioConfig *rest.Config, expiry time.Duration) (*clientFactory, error) {
mutex.Lock()
if factory == nil {
clientEntriesMap := make(map[string]*clientEntry)
factory = &clientFactory{
baseIstioConfig: istioConfig,
clientEntries: clientEntriesMap,
}
go watchClients(clientEntriesMap, expiry)
}
mutex.Unlock()
return factory, nil
}
// NewClient creates a new ClientInterface based on a users k8s token
func (cf *clientFactory) newClient(token string) (ClientInterface, error) {
config := *cf.baseIstioConfig
config.BearerToken = token
// There is a feature when using OpenID strategy to allow using a proxy
// for the cluster API. People may want to place a proxy in
// front of the cluster API when using Kubernetes-as-a-service and
// the provider does not support configuring OpenID integration.
// If OpenID integration is not available, people may opt into
// an API proxy (like kube-oidc-proxy) as a workaround for OIDC integration.
// Clearly, under this scenario, the cluster API must be accessed
// through the proxy (not directly).
//
// So, if OpenID strategy is active, check if a proxy is configured.
// If there is, use it UNLESS the token is the one of the Kiali SA. If
// the token is the one of the Kiali SA, the proxy can be bypassed.
cfg := kialiConfig.Get()
if cfg.Auth.Strategy == kialiConfig.AuthStrategyOpenId && cfg.Auth.OpenId.ApiProxy != "" && cfg.Auth.OpenId.ApiProxyCAData != "" {
kialiToken, err := GetKialiToken()
if err != nil {
return nil, err
}
if kialiToken != token {
// Using `UseRemoteCreds` function as a helper
apiProxyConfig, errProxy := UseRemoteCreds(&RemoteSecret{
Clusters: []RemoteSecretClusterListItem{
{
Cluster: RemoteSecretCluster{
CertificateAuthorityData: cfg.Auth.OpenId.ApiProxyCAData,
Server: cfg.Auth.OpenId.ApiProxy,
},
Name: "api_proxy",
},
},
})
if errProxy != nil {
return nil, errProxy
}
config.Host = apiProxyConfig.Host
config.TLSClientConfig = apiProxyConfig.TLSClientConfig
}
}
return NewClientFromConfig(&config)
}
// GetClient returns a client for the specified token. Creating one if necessary.
func (cf *clientFactory) GetClient(token string) (ClientInterface, error) {
clientEntry, err := cf.getClientEntry(token)
if err != nil {
return nil, err
}
return clientEntry.client, nil
}
// getClientEntry returns a clientEntry for the specified token. Creating one if necessary.
func (cf *clientFactory) getClientEntry(token string) (*clientEntry, error) {
mutex.RLock()
cEntry, ok := cf.clientEntries[token]
mutex.RUnlock()
if ok {
return cEntry, nil
} else {
client, err := cf.newClient(token)
if err != nil {
log.Errorf("Error fetching the Kubernetes client: %v", err)
return nil, err
}
cEntry := clientEntry{
client: client,
created: time.Now(),
}
mutex.Lock()
cf.clientEntries[token] = &cEntry
mutex.Unlock()
internalmetrics.SetKubernetesClients(len(cf.clientEntries))
return &cEntry, nil
}
}
// watchClients loops over clients and removes ones which are too old
func watchClients(clientEntries map[string]*clientEntry, expiry time.Duration) {
for {
time.Sleep(expiry)
mutex.Lock()
for token, clientEntry := range clientEntries {
if time.Since(clientEntry.created) > expiry {
delete(clientEntries, token)
}
}
internalmetrics.SetKubernetesClients(len(clientEntries))
mutex.Unlock()
}
}