-
Notifications
You must be signed in to change notification settings - Fork 23
/
events.go
387 lines (358 loc) · 14.5 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
// Copyright 2018 Bull S.A.S. Atos Technologies - Bull, Rue Jean Jaures, B.P.68, 78340, Les Clayes-sous-Bois, France.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package events
import (
"context"
"encoding/json"
"path"
"strconv"
"strings"
"time"
"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
"github.com/ystia/yorc/v3/helper/consulutil"
"github.com/ystia/yorc/v3/log"
)
// PublishAndLogAttributeValueChange publishes a value change for a given attribute instance of a given node and log this change into the log API
//
// PublishAndLogAttributeValueChange returns the published event id
func PublishAndLogAttributeValueChange(ctx context.Context, deploymentID, nodeName, instanceName, attributeName, value, status string) (string, error) {
ctx = AddLogOptionalFields(ctx, LogOptionalFields{NodeID: nodeName, InstanceID: instanceName})
info := buildInfoFromContext(ctx)
info[ENodeID] = nodeName
info[EInstanceID] = instanceName
info[EAttributeName] = attributeName
info[EAttributeValue] = value
e, err := newStatusChange(StatusChangeTypeAttributeValue, info, deploymentID, status)
if err != nil {
return "", err
}
id, err := e.register()
if err != nil {
return "", err
}
WithContextOptionalFields(ctx).NewLogEntry(LogLevelINFO, deploymentID).Registerf("Attribute value for node %q, instance %q changed to %q", nodeName, instanceName, value)
return id, nil
}
// PublishAndLogMapAttributeValueChange publishes a map attribute/value change for a given attribute instance of a given node and log this change into the log API
// This function doesn't return any published event id
func PublishAndLogMapAttributeValueChange(ctx context.Context, deploymentID, nodeName, instanceName string, attributesValues map[string]string, status string) error {
for attr, attrVal := range attributesValues {
_, err := PublishAndLogAttributeValueChange(ctx, deploymentID, nodeName, instanceName, attr, attrVal, status)
if err != nil {
return err
}
}
return nil
}
// InstanceStatusChange publishes a status change for a given instance of a given node
//
// InstanceStatusChange returns the published event id
//
// Deprecated: Use PublishAndLogInstanceStatusChange instead
func InstanceStatusChange(kv *api.KV, deploymentID, nodeName, instance, status string) (string, error) {
return PublishAndLogInstanceStatusChange(nil, kv, deploymentID, nodeName, instance, status)
}
// PublishAndLogInstanceStatusChange publishes a status change for a given instance of a given node and log this change into the log API
//
// PublishAndLogInstanceStatusChange returns the published event id
func PublishAndLogInstanceStatusChange(ctx context.Context, kv *api.KV, deploymentID, nodeName, instance, status string) (string, error) {
ctx = AddLogOptionalFields(ctx, LogOptionalFields{NodeID: nodeName, InstanceID: instance})
info := buildInfoFromContext(ctx)
info[ENodeID] = nodeName
info[EInstanceID] = instance
e, err := newStatusChange(StatusChangeTypeInstance, info, deploymentID, strings.ToLower(status))
if err != nil {
return "", err
}
id, err := e.register()
if err != nil {
return "", err
}
WithContextOptionalFields(ctx).NewLogEntry(LogLevelINFO, deploymentID).Registerf("Status for node %q, instance %q changed to %q", nodeName, instance, status)
return id, nil
}
// DeploymentStatusChange publishes a status change for a given deployment
//
// DeploymentStatusChange returns the published event id
//
// Deprecated: use PublishAndLogDeploymentStatusChange instead
func DeploymentStatusChange(kv *api.KV, deploymentID, status string) (string, error) {
return PublishAndLogDeploymentStatusChange(context.Background(), kv, deploymentID, status)
}
// PublishAndLogDeploymentStatusChange publishes a status change for a given deployment and log this change into the log API
//
// PublishAndLogDeploymentStatusChange returns the published event id
func PublishAndLogDeploymentStatusChange(ctx context.Context, kv *api.KV, deploymentID, status string) (string, error) {
e, err := newStatusChange(StatusChangeTypeDeployment, nil, deploymentID, strings.ToLower(status))
if err != nil {
return "", err
}
id, err := e.register()
if err != nil {
return "", err
}
WithContextOptionalFields(ctx).NewLogEntry(LogLevelINFO, deploymentID).Registerf("Status for deployment %q changed to %q", deploymentID, status)
return id, nil
}
// CustomCommandStatusChange publishes a status change for a custom command
//
// CustomCommandStatusChange returns the published event id
//
// Deprecated: use PublishAndLogCustomCommandStatusChange instead
func CustomCommandStatusChange(kv *api.KV, deploymentID, taskID, status string) (string, error) {
return PublishAndLogCustomCommandStatusChange(nil, kv, deploymentID, taskID, status)
}
// PublishAndLogCustomCommandStatusChange publishes a status change for a custom command and log this change into the log API
//
// PublishAndLogCustomCommandStatusChange returns the published event id
func PublishAndLogCustomCommandStatusChange(ctx context.Context, kv *api.KV, deploymentID, taskID, status string) (string, error) {
if ctx == nil {
ctx = NewContext(context.Background(), LogOptionalFields{ExecutionID: taskID})
}
info := buildInfoFromContext(ctx)
info[ETaskID] = taskID
e, err := newStatusChange(StatusChangeTypeCustomCommand, info, deploymentID, strings.ToLower(status))
if err != nil {
return "", err
}
id, err := e.register()
if err != nil {
return "", err
}
WithContextOptionalFields(ctx).NewLogEntry(LogLevelINFO, deploymentID).Registerf("Status for custom-command %q changed to %q", taskID, status)
return id, nil
}
// ScalingStatusChange publishes a status change for a scaling task
//
// ScalingStatusChange returns the published event id
//
// Deprecated: use PublishAndLogScalingStatusChange instead
func ScalingStatusChange(kv *api.KV, deploymentID, taskID, status string) (string, error) {
return PublishAndLogScalingStatusChange(nil, kv, deploymentID, taskID, status)
}
// PublishAndLogScalingStatusChange publishes a status change for a scaling task and log this change into the log API
//
// PublishAndLogScalingStatusChange returns the published event id
func PublishAndLogScalingStatusChange(ctx context.Context, kv *api.KV, deploymentID, taskID, status string) (string, error) {
if ctx == nil {
ctx = NewContext(context.Background(), LogOptionalFields{ExecutionID: taskID})
}
info := buildInfoFromContext(ctx)
info[ETaskID] = taskID
e, err := newStatusChange(StatusChangeTypeScaling, info, deploymentID, strings.ToLower(status))
if err != nil {
return "", err
}
id, err := e.register()
if err != nil {
return "", err
}
WithContextOptionalFields(ctx).NewLogEntry(LogLevelINFO, deploymentID).Registerf("Status for scaling task %q changed to %q", taskID, status)
return id, nil
}
// WorkflowStatusChange publishes a status change for a workflow task
//
// WorkflowStatusChange returns the published event id
//
// Deprecated: use PublishAndLogWorkflowStatusChange instead
func WorkflowStatusChange(kv *api.KV, deploymentID, taskID, workflowName, status string) (string, error) {
return PublishAndLogWorkflowStatusChange(nil, kv, deploymentID, taskID, workflowName, status)
}
// PublishAndLogWorkflowStepStatusChange publishes a status change for a workflow step execution and log this change into the log API
//
// PublishAndLogWorkflowStepStatusChange returns the published event id
func PublishAndLogWorkflowStepStatusChange(ctx context.Context, kv *api.KV, deploymentID, taskID string, wfStepInfo *WorkflowStepInfo, status string) (string, error) {
if ctx == nil {
ctx = NewContext(context.Background(), LogOptionalFields{ExecutionID: taskID})
}
if wfStepInfo == nil {
return "", errors.Errorf("WorkflowStep information param must be provided")
}
info := buildInfoFromContext(ctx)
info[ETaskID] = taskID
info[EInstanceID] = wfStepInfo.InstanceName
info[EWorkflowID] = wfStepInfo.WorkflowName
info[ENodeID] = wfStepInfo.NodeName
info[EWorkflowStepID] = wfStepInfo.StepName
info[EOperationName] = wfStepInfo.OperationName
info[ETargetNodeID] = wfStepInfo.TargetNodeID
info[ETargetInstanceID] = wfStepInfo.TargetInstanceID
e, err := newStatusChange(StatusChangeTypeWorkflowStep, info, deploymentID, strings.ToLower(status))
if err != nil {
return "", err
}
id, err := e.register()
if err != nil {
return "", err
}
WithContextOptionalFields(ctx).NewLogEntry(LogLevelINFO, deploymentID).Registerf("Status for workflow step %q changed to %q", wfStepInfo.StepName, status)
return id, nil
}
// PublishAndLogAlienTaskStatusChange publishes a status change for a task execution and log this change into the log API
//
// PublishAndLogAlienTaskStatusChange returns the published event id
func PublishAndLogAlienTaskStatusChange(ctx context.Context, kv *api.KV, deploymentID, taskID, taskExecutionID string, wfStepInfo *WorkflowStepInfo, status string) (string, error) {
if ctx == nil {
ctx = NewContext(context.Background(), LogOptionalFields{ExecutionID: taskID, TaskExecutionID: taskExecutionID})
}
info := buildInfoFromContext(ctx)
info[ETaskID] = taskID
// Warning: Alien task corresponds to what we call taskExecution
info[ETaskExecutionID] = taskExecutionID
info[EWorkflowID] = wfStepInfo.WorkflowName
info[ENodeID] = wfStepInfo.NodeName
info[EWorkflowStepID] = wfStepInfo.StepName
info[EInstanceID] = wfStepInfo.InstanceName
info[EOperationName] = wfStepInfo.OperationName
info[ETargetNodeID] = wfStepInfo.TargetNodeID
info[ETargetInstanceID] = wfStepInfo.TargetInstanceID
e, err := newStatusChange(StatusChangeTypeAlienTask, info, deploymentID, strings.ToLower(status))
if err != nil {
return "", err
}
id, err := e.register()
if err != nil {
return "", err
}
WithContextOptionalFields(ctx).NewLogEntry(LogLevelINFO, deploymentID).Registerf("Status for task execution %q changed to %q", taskExecutionID, status)
return id, nil
}
// PublishAndLogWorkflowStatusChange publishes a status change for a workflow task and log this change into the log API
//
// PublishAndLogWorkflowStatusChange returns the published event id
func PublishAndLogWorkflowStatusChange(ctx context.Context, kv *api.KV, deploymentID, taskID, workflowID, status string) (string, error) {
if ctx == nil {
ctx = NewContext(context.Background(), LogOptionalFields{ExecutionID: taskID})
}
info := buildInfoFromContext(ctx)
info[ETaskID] = taskID
info[EWorkflowID] = workflowID
e, err := newStatusChange(StatusChangeTypeWorkflow, info, deploymentID, strings.ToLower(status))
if err != nil {
return "", err
}
id, err := e.register()
if err != nil {
return "", err
}
WithContextOptionalFields(ctx).NewLogEntry(LogLevelINFO, deploymentID).Registerf("Status for workflow %q changed to %q", workflowID, status)
return id, nil
}
// StatusEvents return a list of events (StatusUpdate instances) for all, or a given deployment
func StatusEvents(kv *api.KV, deploymentID string, waitIndex uint64, timeout time.Duration) ([]json.RawMessage, uint64, error) {
events := make([]json.RawMessage, 0)
var eventsPrefix string
if deploymentID != "" {
// the returned list of events must correspond to the provided deploymentID
eventsPrefix = path.Join(consulutil.EventsPrefix, deploymentID)
} else {
// the returned list of events must correspond to all the deployments
eventsPrefix = path.Join(consulutil.EventsPrefix)
}
kvps, qm, err := kv.List(eventsPrefix, &api.QueryOptions{WaitIndex: waitIndex, WaitTime: timeout})
if err != nil || qm == nil {
return events, 0, err
}
log.Debugf("Found %d events before accessing index[%q]", len(kvps), strconv.FormatUint(qm.LastIndex, 10))
for _, kvp := range kvps {
if kvp.ModifyIndex <= waitIndex {
continue
}
events = append(events, kvp.Value)
}
log.Debugf("Found %d events after index", len(events))
return events, qm.LastIndex, nil
}
// LogsEvents allows to return logs from Consul KV storage for all, or a given deployment
func LogsEvents(kv *api.KV, deploymentID string, waitIndex uint64, timeout time.Duration) ([]json.RawMessage, uint64, error) {
logs := make([]json.RawMessage, 0)
var logsPrefix string
if deploymentID != "" {
// the returned list of logs must correspond to the provided deploymentID
logsPrefix = path.Join(consulutil.LogsPrefix, deploymentID)
} else {
// the returned list of logs must correspond to all the deployments
logsPrefix = path.Join(consulutil.LogsPrefix)
}
kvps, qm, err := kv.List(logsPrefix, &api.QueryOptions{WaitIndex: waitIndex, WaitTime: timeout})
if err != nil || qm == nil {
return logs, 0, err
}
log.Debugf("Found %d logs before accessing index[%q]", len(kvps), strconv.FormatUint(qm.LastIndex, 10))
for _, kvp := range kvps {
if kvp.ModifyIndex <= waitIndex {
continue
}
logs = append(logs, kvp.Value)
}
log.Debugf("Found %d logs after index", len(logs))
return logs, qm.LastIndex, nil
}
// GetStatusEventsIndex returns the latest index of InstanceStatus events for a given deployment
func GetStatusEventsIndex(kv *api.KV, deploymentID string) (uint64, error) {
_, qm, err := kv.Get(path.Join(consulutil.EventsPrefix, deploymentID), nil)
if err != nil {
return 0, err
}
if qm == nil {
return 0, errors.New("Failed to retrieve last index for events")
}
return qm.LastIndex, nil
}
// GetLogsEventsIndex returns the latest index of LogEntry events for a given deployment
func GetLogsEventsIndex(kv *api.KV, deploymentID string) (uint64, error) {
_, qm, err := kv.Get(path.Join(consulutil.LogsPrefix, deploymentID), nil)
if err != nil {
return 0, err
}
if qm == nil {
return 0, errors.New("Failed to retrieve last index for logs")
}
return qm.LastIndex, nil
}
func buildInfoFromContext(ctx context.Context) Info {
infoUp := make(Info)
lof, ok := FromContext(ctx)
if ok {
for k, v := range lof {
infType, has := bindEventInfoWithContext(k)
if has {
infoUp[infType] = v
}
}
}
return infoUp
}
func bindEventInfoWithContext(f FieldType) (InfoType, bool) {
var i InfoType
has := true
switch f {
case WorkFlowID:
i = EWorkflowID
case ExecutionID:
i = ETaskID
case NodeID:
i = ENodeID
case InstanceID:
i = EInstanceID
case OperationName:
i = EOperationName
case TaskExecutionID:
i = ETaskExecutionID
default:
has = false
}
return i, has
}