@@ -20,7 +20,13 @@ import (
20
20
"context"
21
21
"crypto/tls"
22
22
"crypto/x509"
23
+ "encoding/json"
23
24
"fmt"
25
+ "net"
26
+ "net/http"
27
+ "time"
28
+
29
+ "github.com/go-resty/resty/v2"
24
30
25
31
"github.com/go-logr/logr"
26
32
hazelcast "github.com/hazelcast/hazelcast-go-client"
@@ -130,8 +136,9 @@ func (o *KubeDBClientBuilder) GetTLSConfig() (*tls.Config, error) {
130
136
}
131
137
132
138
func (o * KubeDBClientBuilder ) GetHazelcastClient () (* Client , error ) {
133
- if o .podName == "" {
134
- o .url = o .ServiceURL ()
139
+ if o .podName != "" {
140
+ o .url = fmt .Sprintf ("%s.%s.%s.svc.cluster.local:%d" , o .podName , o .db .GoverningServiceName (), o .db .GetNamespace (), kubedb .HazelcastRestPort )
141
+
135
142
}
136
143
137
144
if o .url == "" {
@@ -183,6 +190,116 @@ func (o *KubeDBClientBuilder) GetHazelcastClient() (*Client, error) {
183
190
}, nil
184
191
}
185
192
193
+ type RestyConfig struct {
194
+ host string
195
+ transport * http.Transport
196
+ }
197
+
198
+ func (o * KubeDBClientBuilder ) GetHazelcastRestyClient () (* HZRestyClient , error ) {
199
+ if o .url == "" {
200
+ o .url = fmt .Sprintf ("%s://%s.%s.svc:%d" , o .db .GetConnectionScheme (), o .db .ServiceName (), o .db .GetNamespace (), kubedb .HazelcastRestPort )
201
+ }
202
+
203
+ config := RestyConfig {
204
+ host : o .url ,
205
+ transport : & http.Transport {
206
+ IdleConnTimeout : time .Second * 3 ,
207
+ DialContext : (& net.Dialer {
208
+ Timeout : time .Second * 30 ,
209
+ }).DialContext ,
210
+ },
211
+ }
212
+
213
+ var username , password string
214
+ if ! o .db .Spec .DisableSecurity {
215
+ user , pass , err := o .GetAuthCredentials ()
216
+ if err != nil {
217
+ return nil , err
218
+ }
219
+ username = user
220
+ password = pass
221
+ }
222
+
223
+ defaultTlsConfig , err := o .GetTLSConfig ()
224
+ if err != nil {
225
+ klog .Errorf ("Failed to get default tls config: %v" , err )
226
+ }
227
+ config .transport .TLSClientConfig = defaultTlsConfig
228
+ newClient := resty .New ()
229
+ newClient .SetTransport (config .transport ).SetScheme (o .db .GetConnectionScheme ()).SetBaseURL (config .host )
230
+ newClient .SetHeader ("Accept" , "application/json" )
231
+ newClient .SetBasicAuth (username , password )
232
+ newClient .SetTimeout (time .Second * 30 )
233
+
234
+ return & HZRestyClient {
235
+ Client : newClient ,
236
+ Config : & config ,
237
+ password : password ,
238
+ }, nil
239
+ }
240
+
241
+ func (client * HZRestyClient ) ChangeClusterState (state string ) (string , error ) {
242
+ req := client .Client .R ().SetDoNotParseResponse (true )
243
+ param := fmt .Sprintf ("admin&%s&%s" , client .password , state )
244
+ req .SetHeader ("Content-Type" , "application/json" )
245
+ req .SetBody (param )
246
+ res , err := req .Post ("/hazelcast/rest/management/cluster/changeState" )
247
+ if err != nil {
248
+ klog .Error (err , "Failed to send http request" )
249
+ return "" , err
250
+ }
251
+ if res != nil {
252
+ if res .IsError () {
253
+ klog .Error (res .Error ())
254
+ return "" , errors .New (fmt .Sprintf ("HTTP request failed: %v, StatusCode: %v" , res .Error (), res .StatusCode ()))
255
+ }
256
+ } else {
257
+ return "" , errors .New ("response can not be nil" )
258
+ }
259
+ body := res .RawBody ()
260
+ responseBody := make (map [string ]interface {})
261
+ if err := json .NewDecoder (body ).Decode (& responseBody ); err != nil {
262
+ return "" , fmt .Errorf ("failed to deserialize the response: %v" , err )
263
+ }
264
+ if val , ok := responseBody ["status" ]; ok {
265
+ if strValue , ok := val .(string ); ok {
266
+ return strValue , nil
267
+ }
268
+ return "" , errors .New ("failed to convert response to string" )
269
+ }
270
+ return "" , errors .New ("status is missing" )
271
+ }
272
+
273
+ func (client * HZRestyClient ) GetClusterState () (string , error ) {
274
+ req := client .Client .R ().SetDoNotParseResponse (true )
275
+
276
+ res , err := req .Get ("/hazelcast/health" )
277
+ if err != nil {
278
+ klog .Error (err , "Failed to send http request" )
279
+ return "" , err
280
+ }
281
+ if res != nil {
282
+ if res .IsError () {
283
+ klog .Error (res .Error ())
284
+ return "" , errors .New (fmt .Sprintf ("HTTP request failed: %v, StatusCode: %v" , res .Error (), res .StatusCode ()))
285
+ }
286
+ } else {
287
+ return "" , errors .New ("response can not be nil" )
288
+ }
289
+ body := res .RawBody ()
290
+ responseBody := make (map [string ]interface {})
291
+ if err := json .NewDecoder (body ).Decode (& responseBody ); err != nil {
292
+ return "" , fmt .Errorf ("failed to deserialize the response: %v" , err )
293
+ }
294
+ if val , ok := responseBody ["clusterState" ]; ok {
295
+ if strValue , ok := val .(string ); ok {
296
+ return strValue , nil
297
+ }
298
+ return "" , errors .New ("failed to convert response to string" )
299
+ }
300
+ return "" , errors .New ("status is missing" )
301
+ }
302
+
186
303
func (o * KubeDBClientBuilder ) ServiceURL () string {
187
304
return fmt .Sprintf ("%s.%s.svc:%d" , o .db .ServiceName (), o .db .GetNamespace (), kubedb .HazelcastRestPort )
188
305
}
0 commit comments