@@ -17,14 +17,11 @@ limitations under the License.
17
17
package druid
18
18
19
19
import (
20
- "bytes"
21
20
"encoding/json"
22
21
"fmt"
23
- "log"
24
22
"time"
25
23
26
24
druidgo "github.com/grafadruid/go-druid"
27
- "github.com/hashicorp/go-retryablehttp"
28
25
"github.com/pkg/errors"
29
26
"k8s.io/klog/v2"
30
27
health "kmodules.xyz/client-go/tools/healthchecker"
@@ -117,55 +114,15 @@ func (c *Client) CheckDataSourceExistence() (bool, error) {
117
114
return false , errors .Wrap (err , "failed to marshal json response" )
118
115
}
119
116
rawMessage := json .RawMessage (jsonData )
120
- response , err := c .SubmitRequest (method , path , rawMessage )
121
- if err != nil {
122
- return false , err
123
- }
124
-
125
- exists , err := parseDatasourceExistenceQueryResponse (response )
126
- if err != nil {
127
- return false , errors .Wrap (err , "Failed to parse response of datasource existence request" )
128
- }
129
-
130
- if err := closeResponse (response ); err != nil {
131
- return exists , err
132
- }
133
- return exists , nil
134
- }
135
-
136
- func (c * Client ) SubmitRequest (method , path string , opts interface {}) (* druidgo.Response , error ) {
137
- res , err := c .NewRequest (method , path , opts )
138
- if err != nil {
139
- return nil , errors .Wrap (err , "failed to submit API request" )
140
- }
141
- http := retryablehttp .NewClient ()
142
-
143
- var b []byte
144
- buf := bytes .NewBuffer (b )
145
- http .Logger = log .New (buf , "" , 0 )
146
117
147
- resp , err := http .Do (res )
118
+ var result []map [string ]interface {}
119
+ _ , err = c .ExecuteRequest (method , path , rawMessage , & result )
148
120
if err != nil {
149
- return nil , err
150
- }
151
- response := & druidgo.Response {Response : resp }
152
- return response , nil
153
- }
154
-
155
- func parseDatasourceExistenceQueryResponse (res * druidgo.Response ) (bool , error ) {
156
- var responseBody []map [string ]interface {}
157
- if err := json .NewDecoder (res .Body ).Decode (& responseBody ); err != nil {
158
- return false , errors .Wrap (err , "failed to deserialize the response" )
121
+ klog .Error ("Failed to execute request" , err )
122
+ return false , err
159
123
}
160
- return len (responseBody ) != 0 , nil
161
- }
162
124
163
- func closeResponse (response * druidgo.Response ) error {
164
- err := response .Body .Close ()
165
- if err != nil {
166
- return errors .Wrap (err , "Failed to close the response body" )
167
- }
168
- return nil
125
+ return len (result ) > 0 , nil
169
126
}
170
127
171
128
// CheckDBReadWriteAccess checks read and write access in the DB
@@ -238,41 +195,25 @@ func (c *Client) GetData() (string, error) {
238
195
func (c * Client ) runSelectQuery () (string , error ) {
239
196
method := "POST"
240
197
path := "druid/v2/sql"
241
-
242
198
data := map [string ]interface {}{
243
199
"query" : "SELECT * FROM \" kubedb-datasource\" " ,
244
200
}
201
+
245
202
jsonData , err := json .Marshal (data )
246
203
if err != nil {
247
204
return "" , errors .Wrap (err , "failed to marshal query json data" )
248
205
}
249
206
rawMessage := json .RawMessage (jsonData )
250
- response , err := c .SubmitRequest (method , path , rawMessage )
251
- if err != nil {
252
- return "" , err
253
- }
254
- if response == nil {
255
- return "" , errors .New ("response body is empty" )
256
- }
257
207
258
- id , err := parseSelectQueryResponse (response , "id" )
208
+ var result []map [string ]interface {}
209
+ _ , err = c .ExecuteRequest (method , path , rawMessage , & result )
259
210
if err != nil {
260
- return "" , errors .Wrap (err , "failed to parse the response body" )
261
- }
262
-
263
- if err := closeResponse (response ); err != nil {
211
+ klog .Error ("Failed to execute POST query request" , err )
264
212
return "" , err
265
213
}
266
- return id .(string ), nil
267
- }
214
+ id := result [0 ]["id" ]
268
215
269
- func parseSelectQueryResponse (res * druidgo.Response , key string ) (interface {}, error ) {
270
- var responseBody []map [string ]interface {}
271
- if err := json .NewDecoder (res .Body ).Decode (& responseBody ); err != nil {
272
- return "" , errors .Wrap (err , "failed to deserialize the response" )
273
- }
274
- value := responseBody [0 ][key ]
275
- return value , nil
216
+ return id .(string ), nil
276
217
}
277
218
278
219
func (c * Client ) updateCoordinatorsWaitBeforeDeletingConfig (value int32 ) error {
@@ -296,11 +237,9 @@ func (c *Client) updateCoordinatorDynamicConfig(data map[string]interface{}) err
296
237
}
297
238
rawMessage := json .RawMessage (jsonData )
298
239
299
- response , err : = c .SubmitRequest (method , path , rawMessage )
240
+ _ , err = c .ExecuteRequest (method , path , rawMessage , nil )
300
241
if err != nil {
301
- return err
302
- }
303
- if err := closeResponse (response ); err != nil {
242
+ klog .Error ("Failed to execute coordinator config update request" , err )
304
243
return err
305
244
}
306
245
return nil
@@ -336,33 +275,19 @@ func (c *Client) submitTask(taskType DruidTaskType, dataSource string, data stri
336
275
} else {
337
276
task = GetKillTaskDefinition ()
338
277
}
339
-
340
278
rawMessage := json .RawMessage (task )
341
279
method := "POST"
342
280
path := "druid/indexer/v1/task"
343
281
344
- response , err := c .SubmitRequest (method , path , rawMessage )
345
- if err != nil {
346
- return "" , err
347
- }
348
-
349
- taskID , err := GetValueFromClusterResponse (response , "task" )
282
+ var result map [string ]interface {}
283
+ _ , err := c .ExecuteRequest (method , path , rawMessage , & result )
350
284
if err != nil {
351
- return "" , errors .Wrap (err , "failed to parse response of task api request" )
352
- }
353
- if err = closeResponse (response ); err != nil {
285
+ klog .Error ("Failed to execute POST ingestion or kill task request" , err )
354
286
return "" , err
355
287
}
356
- return fmt .Sprintf ("%v" , taskID ), nil
357
- }
358
288
359
- func GetValueFromClusterResponse (res * druidgo.Response , key string ) (interface {}, error ) {
360
- responseBody := make (map [string ]interface {})
361
- if err := json .NewDecoder (res .Body ).Decode (& responseBody ); err != nil {
362
- return "" , errors .Wrap (err , "failed to deserialize the response" )
363
- }
364
- value := responseBody [key ]
365
- return value , nil
289
+ taskID := result ["task" ]
290
+ return taskID .(string ), nil
366
291
}
367
292
368
293
func GetIngestionTaskDefinition (dataSource string , data string ) string {
@@ -419,21 +344,18 @@ func GetKillTaskDefinition() string {
419
344
func (c * Client ) CheckTaskStatus (taskID string ) (bool , error ) {
420
345
method := "GET"
421
346
path := fmt .Sprintf ("druid/indexer/v1/task/%s/status" , taskID )
422
- response , err := c .SubmitRequest (method , path , nil )
423
- if err != nil {
424
- return false , errors .Wrap (err , "failed to check task status" )
425
- }
426
347
427
- statusRes , err := GetValueFromClusterResponse (response , "status" )
348
+ var result map [string ]interface {}
349
+ _ , err := c .ExecuteRequest (method , path , nil , & result )
428
350
if err != nil {
429
- return false , errors .Wrap (err , "failed to parse respons of task ingestion request" )
351
+ klog .Error ("Failed to execute GET task status request" , err )
352
+ return false , err
430
353
}
354
+
355
+ statusRes := result ["status" ]
431
356
statusMap := statusRes .(map [string ]interface {})
432
357
status := statusMap ["status" ].(string )
433
358
434
- if err = closeResponse (response ); err != nil {
435
- return false , err
436
- }
437
359
return status == "SUCCESS" , nil
438
360
}
439
361
0 commit comments