-
Notifications
You must be signed in to change notification settings - Fork 27
/
aggregate.go
358 lines (307 loc) · 11.4 KB
/
aggregate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
package cloudmanager
import (
"encoding/json"
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/fatih/structs"
)
// createAggregateRequest the users input for creating an Aggregate
type createAggregateRequest struct {
Name string `structs:"name"`
WorkingEnvironmentID string `structs:"workingEnvironmentId"`
NumberOfDisks int `structs:"numberOfDisks"`
DiskSize diskSize `structs:"diskSize"`
HomeNode string `structs:"homeNode,omitempty"`
ProviderVolumeType string `structs:"providerVolumeType,omitempty"`
CapacityTier string `structs:"capacityTier,omitempty"`
Iops int `structs:"iops,omitempty"`
Throughput int `structs:"throughput,omitempty"`
}
// diskSize struct
type diskSize struct {
Size int `structs:"size"`
Unit string `structs:"unit"`
}
// aggregateResult from aggregate request
type aggregateResult struct {
Name string `json:"name"`
AvailableCapacity capacity `json:"availableCapacity"`
TotalCapacity capacity `json:"totalCapacity"`
UsedCapacity capacity `json:"usedCapacity"`
Volumes []volume `json:"volumes"`
ProviderVolumes []providerVolume `json:"providerVolumes"`
Disks []disk `json:"disks"`
State string `json:"state"`
EncryptionType string `json:"encryptionType"`
EncryptionKeyID string `json:"encryptionKeyId"`
IsRoot bool `json:"isRoot"`
HomeNode string `json:"homeNode"`
OwnerNode string `json:"ownerNode"`
CapacityTier string `json:"capacityTier"`
CapacityTierUsed capacity `json:"capacityTierUsed"`
SidlEnabled bool `json:"sidlEnabled"`
SnaplockType string `json:"snaplockType"`
}
type capacity struct {
Size float64 `json:"size"`
Unit string `json:"unit"`
}
type volume struct {
Name string `json:"name"`
TotalSize capacity `json:"totalSize"`
UsedSize capacity `json:"usedSize"`
ThinProvisioned bool `json:"thinProvisioned"`
IsClone bool `json:"isClone"`
RootVolume bool `json:"rootVolume"`
}
type providerVolume struct {
ID string `json:"id"`
Name string `json:"name"`
Size capacity `json:"size"`
State string `json:"state"`
Device string `json:"device"`
InstanceID string `json:"instanceId"`
DiskType string `json:"diskType"`
Encrypted bool `json:"encrypted"`
Iops int `json:"iops"`
Throughput int `json:"throughput"`
}
type disk struct {
Name string `json:"name"`
Position string `json:"position"`
OwnerNode string `json:"ownerNode"`
Device string `json:"device"`
VMDiskProperties vmDiskProperties `json:"vmDiskProperties"`
}
type vmDiskProperties struct {
ObjectName string `json:"objectName"`
StorageAccountName string `json:"storageAccountName"`
ContainerName string `json:"containerName"`
}
type aggregateRequest struct {
WorkingEnvironmentID string `structs:"workingEnvironmentId"`
}
type deleteAggregateRequest struct {
WorkingEnvironmentID string `structs:"workingEnvironmentId"`
Name string `structs:"name"`
}
type updateAggregateRequest struct {
WorkingEnvironmentID string `structs:"workingEnvironmentId"`
Name string `structs:"name"`
NumberOfDisks int `structs:"numberOfDisks"`
}
// get aggregate by workingEnvironmentId+aggregate name
func (c *Client) getAggregate(request aggregateRequest, name string, sourceWorkingEnvironmentType string, clientID string) (aggregateResult, error) {
log.Printf("getAggregate %s", name)
hostType := "CloudManagerHost"
var baseURL string
if sourceWorkingEnvironmentType == "ON_PREM" {
baseURL = fmt.Sprintf("/occm/api/onprem/aggregates?workingEnvironmentId=%s", request.WorkingEnvironmentID)
} else {
rootURL, cloudProviderName, err := c.getAPIRoot(request.WorkingEnvironmentID, clientID)
if err != nil {
log.Print("getAggregate: Cannot get API root.")
return aggregateResult{}, err
}
if cloudProviderName != "Amazon" {
baseURL = fmt.Sprintf("%s/aggregates/%s", rootURL, request.WorkingEnvironmentID)
} else {
baseURL = fmt.Sprintf("%s/aggregates?workingEnvironmentId=%s", rootURL, request.WorkingEnvironmentID)
}
}
var aggregates []aggregateResult
statusCode, response, _, err := c.CallAPIMethod("GET", baseURL, nil, c.Token, hostType, clientID)
if err != nil {
log.Printf("getAggregate request failed. Response %v, err %v", response, err)
return aggregateResult{}, err
}
responseError := apiResponseChecker(statusCode, response, "getAggregate")
if responseError != nil {
return aggregateResult{}, responseError
}
if err := json.Unmarshal(response, &aggregates); err != nil {
log.Print("Failed to unmarshall response from getAggregates")
return aggregateResult{}, err
}
log.Printf("getAggregate: get list of aggregates. %v", aggregates)
log.Printf("Find the match one. %v", name)
for i := range aggregates {
if aggregates[i].Name == name {
log.Printf("Found aggregate: %#v state %s", aggregates[i], aggregates[i].State)
return aggregates[i], nil
}
}
log.Print("Cannot find the aggregate")
return aggregateResult{}, nil
}
// create aggregate
func (c *Client) createAggregate(request *createAggregateRequest, clientID string) (aggregateResult, error) {
log.Printf("createAggregate %v... ", (*request).Name)
params := structs.Map(request)
hostType := "CloudManagerHost"
var baseURL string
rootURL, _, err := c.getAPIRoot(request.WorkingEnvironmentID, clientID)
if err != nil {
log.Print("createAggregate: Cannot get API root.")
return aggregateResult{}, err
}
baseURL = fmt.Sprintf("%s/aggregates", rootURL)
retries := 0
maxRetries := 24 // max retry 150 sec * 24 = 1hr
for {
log.Print("Call aggregate creation API... ", (*request).Name)
statusCode, response, onCloudRequestID, err := c.CallAPIMethod("POST", baseURL, params, c.Token, hostType, clientID)
if err != nil {
log.Print("createAggregate request failed", (*request).Name)
return aggregateResult{}, err
}
responseError := apiResponseChecker(statusCode, response, "createAggregate")
if responseError != nil {
if strings.Contains(responseError.Error(), "code: 409, message: {\"message\":\"Couldn't perform action Create Aggregate, because there are ongoing operations which might interfere with it") {
if retries >= maxRetries {
log.Print("Failed: Reached aggregate creation max retries.")
break
}
retries++
log.Print("Wait for 150 seconds... ", retries)
time.Sleep(150 * time.Second)
} else {
return aggregateResult{}, responseError
}
} else {
// wait for creation
log.Print("Wait for aggregate creation... ", (*request).Name)
err = c.waitOnCompletion(onCloudRequestID, "Aggregate", "create", 15, 60, clientID)
log.Print("Finish waiting... ", (*request).Name)
if err != nil {
return aggregateResult{}, err
}
break
}
}
workingEnvDetail, err := c.getWorkingEnvironmentInfo(request.WorkingEnvironmentID, clientID)
if err != nil {
log.Print("Cannot get working environment information.")
return aggregateResult{}, err
}
var aggregate aggregateResult
aggregate, err = c.getAggregate(aggregateRequest{WorkingEnvironmentID: request.WorkingEnvironmentID}, request.Name, workingEnvDetail.WorkingEnvironmentType, clientID)
if err != nil {
return aggregateResult{}, err
}
log.Printf("Aggregate %v status %v", aggregate.Name, aggregate.State)
return aggregate, nil
}
// delete aggregate
func (c *Client) deleteAggregate(request deleteAggregateRequest, clientID string) error {
log.Print("On deleteAggregate... ")
hostType := "CloudManagerHost"
var baseURL string
rootURL, _, err := c.getAPIRoot(request.WorkingEnvironmentID, clientID)
if err != nil {
log.Print("deleteAggregate: Cannot get API root.")
return err
}
baseURL = fmt.Sprintf("%s/aggregates/%s/%s", rootURL, request.WorkingEnvironmentID, request.Name)
statusCode, response, onCloudRequestID, err := c.CallAPIMethod("DELETE", baseURL, nil, c.Token, hostType, clientID)
if err != nil {
log.Print("deleteAggregate request failed")
return err
}
responseError := apiResponseChecker(statusCode, response, "deleteAggregate")
if responseError != nil {
return responseError
}
log.Print("Wait for aggregate deletion.")
err = c.waitOnCompletion(onCloudRequestID, "Aggregate", "delete", 10, 60, clientID)
if err != nil {
return err
}
return nil
}
func (c *Client) updateAggregate(request updateAggregateRequest, clientID string) error {
log.Print("updateAggregate... ")
params := structs.Map(request)
hostType := "CloudManagerHost"
var baseURL string
rootURL, _, err := c.getAPIRoot(request.WorkingEnvironmentID, clientID)
if err != nil {
log.Print("updateAggregate: Cannot get API root.")
return err
}
baseURL = fmt.Sprintf("%s/aggregates/%s/%s/disks", rootURL, request.WorkingEnvironmentID, request.Name)
statusCode, response, onCloudRequestID, err := c.CallAPIMethod("POST", baseURL, params, c.Token, hostType, clientID)
if err != nil {
log.Print("updateAggregate request failed")
return err
}
responseError := apiResponseChecker(statusCode, response, "updateAggregate")
if responseError != nil {
return responseError
}
log.Print("Wait for aggregate update.")
err = c.waitOnCompletion(onCloudRequestID, "Aggregate", "update", 10, 60, clientID)
if err != nil {
return err
}
return nil
}
// flattenCapacity: convert struct size + unit
func flattenCapacity(c capacity) interface{} {
flattened := make(map[string]interface{})
flattened["size"] = strconv.FormatFloat(c.Size, 'f', -1, 64)
flattened["unit"] = c.Unit
return flattened
}
func flattenDisks(d []disk) interface{} {
dts := make([]map[string]interface{}, 0, len(d))
for _, diskelement := range d {
dt := make(map[string]interface{})
dt["name"] = diskelement.Name
dt["position"] = diskelement.Position
dt["device"] = diskelement.Device
dt["owner_node"] = diskelement.OwnerNode
vdp := make(map[string]interface{})
vdp["object_name"] = diskelement.VMDiskProperties.ObjectName
vdp["storage_account_name"] = diskelement.VMDiskProperties.StorageAccountName
vdp["container_name"] = diskelement.VMDiskProperties.ContainerName
dt["vm_disk_properties"] = vdp
dts = append(dts, dt)
}
return dts
}
func flattenVolumes(v []volume) interface{} {
volumes := make([]map[string]interface{}, 0, len(v))
for _, volume := range v {
vol := make(map[string]interface{})
vol["name"] = volume.Name
vol["thin_provisioned"] = volume.ThinProvisioned
vol["root_volume"] = volume.RootVolume
vol["is_clone"] = volume.IsClone
vol["total_size"] = flattenCapacity(volume.TotalSize)
vol["used_size"] = flattenCapacity(volume.UsedSize)
volumes = append(volumes, vol)
}
return volumes
}
func flattenProviderVolumes(v []providerVolume) interface{} {
volumes := make([]map[string]interface{}, 0, len(v))
for _, volume := range v {
vol := make(map[string]interface{})
vol["id"] = volume.ID
vol["name"] = volume.Name
vol["state"] = volume.State
vol["device"] = volume.Device
vol["instance_id"] = volume.InstanceID
vol["disk_type"] = volume.DiskType
vol["encrypted"] = volume.Encrypted
vol["iops"] = volume.Iops
vol["throughput"] = volume.Throughput
vol["size"] = flattenCapacity(volume.Size)
volumes = append(volumes, vol)
}
return volumes
}