-
Notifications
You must be signed in to change notification settings - Fork 256
/
dashboard_execution_tree.go
346 lines (298 loc) · 10.7 KB
/
dashboard_execution_tree.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
package dashboardexecute
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/turbot/steampipe/pkg/connection_sync"
"github.com/turbot/steampipe/pkg/dashboard/dashboardevents"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
"github.com/turbot/steampipe/pkg/workspace"
"golang.org/x/exp/maps"
)
// DashboardExecutionTree is a structure representing the control result hierarchy
type DashboardExecutionTree struct {
Root dashboardtypes.DashboardTreeRun
dashboardName string
sessionId string
client db_common.Client
// map of executing runs, keyed by full name
runs map[string]dashboardtypes.DashboardTreeRun
workspace *workspace.Workspace
runComplete chan dashboardtypes.DashboardTreeRun
// map of subscribers to notify when an input value changes
cancel context.CancelFunc
inputLock sync.Mutex
inputValues map[string]any
id string
}
func NewDashboardExecutionTree(rootName string, sessionId string, client db_common.Client, workspace *workspace.Workspace) (*DashboardExecutionTree, error) {
// now populate the DashboardExecutionTree
executionTree := &DashboardExecutionTree{
dashboardName: rootName,
sessionId: sessionId,
client: client,
runs: make(map[string]dashboardtypes.DashboardTreeRun),
workspace: workspace,
runComplete: make(chan dashboardtypes.DashboardTreeRun, 1),
inputValues: make(map[string]any),
}
executionTree.id = fmt.Sprintf("%p", executionTree)
// create the root run node (either a report run or a counter run)
root, err := executionTree.createRootItem(rootName)
if err != nil {
return nil, err
}
executionTree.Root = root
return executionTree, nil
}
func (e *DashboardExecutionTree) createRootItem(rootName string) (dashboardtypes.DashboardTreeRun, error) {
parsedName, err := modconfig.ParseResourceName(rootName)
if err != nil {
return nil, err
}
fullName, err := parsedName.ToFullName()
if err != nil {
return nil, err
}
if parsedName.ItemType == "" {
return nil, fmt.Errorf("root item is not valid named resource")
}
// if no mod is specified, assume the workspace mod
if parsedName.Mod == "" {
parsedName.Mod = e.workspace.Mod.ShortName
rootName = fullName
}
switch parsedName.ItemType {
case modconfig.BlockTypeDashboard:
dashboard, ok := e.workspace.GetResourceMaps().Dashboards[rootName]
if !ok {
return nil, fmt.Errorf("dashboard '%s' does not exist in workspace", rootName)
}
return NewDashboardRun(dashboard, e, e)
case modconfig.BlockTypeBenchmark:
benchmark, ok := e.workspace.GetResourceMaps().Benchmarks[rootName]
if !ok {
return nil, fmt.Errorf("benchmark '%s' does not exist in workspace", rootName)
}
return NewCheckRun(benchmark, e, e)
case modconfig.BlockTypeQuery:
// wrap in a table
query, ok := e.workspace.GetResourceMaps().Queries[rootName]
if !ok {
return nil, fmt.Errorf("query '%s' does not exist in workspace", rootName)
}
// wrap this in a chart and a dashboard
dashboard, err := modconfig.NewQueryDashboard(query)
// TACTICAL - set the execution tree dashboard name from the query dashboard
e.dashboardName = dashboard.Name()
if err != nil {
return nil, err
}
return NewDashboardRun(dashboard, e, e)
case modconfig.BlockTypeControl:
// wrap in a table
control, ok := e.workspace.GetResourceMaps().Controls[rootName]
if !ok {
return nil, fmt.Errorf("query '%s' does not exist in workspace", rootName)
}
// wrap this in a chart and a dashboard
dashboard, err := modconfig.NewQueryDashboard(control)
if err != nil {
return nil, err
}
return NewDashboardRun(dashboard, e, e)
default:
return nil, fmt.Errorf("reporting type %s cannot be executed as dashboard", parsedName.ItemType)
}
}
func (e *DashboardExecutionTree) Execute(ctx context.Context) {
startTime := time.Now()
searchPath := e.client.GetRequiredSessionSearchPath()
// store context
cancelCtx, cancel := context.WithCancel(ctx)
e.cancel = cancel
workspace := e.workspace
// perform any necessary initialisation
// (e.g. check run creates the control execution tree)
e.Root.Initialise(cancelCtx)
if e.Root.GetError() != nil {
return
}
// TODO should we always wait even with non custom search path?
// if there is a custom search path, wait until the first connection of each plugin has loaded
if customSearchPath := e.client.GetCustomSearchPath(); customSearchPath != nil {
if err := connection_sync.WaitForSearchPathSchemas(ctx, e.client, customSearchPath); err != nil {
e.Root.SetError(ctx, err)
return
}
}
panels := e.BuildSnapshotPanels()
// build map of those variables referenced by the dashboard run
referencedVariables := GetReferencedVariables(e.Root, e.workspace)
immutablePanels, err := utils.JsonCloneToMap(panels)
if err != nil {
e.SetError(ctx, err)
return
}
workspace.PublishDashboardEvent(ctx, &dashboardevents.ExecutionStarted{
Root: e.Root,
Session: e.sessionId,
ExecutionId: e.id,
Panels: immutablePanels,
Inputs: e.inputValues,
Variables: referencedVariables,
StartTime: startTime,
})
defer func() {
e := &dashboardevents.ExecutionComplete{
Root: e.Root,
Session: e.sessionId,
ExecutionId: e.id,
Panels: panels,
Inputs: e.inputValues,
Variables: referencedVariables,
// search path elements are quoted (for consumption by postgres)
// unquote them
SearchPath: utils.UnquoteStringArray(searchPath),
StartTime: startTime,
EndTime: time.Now(),
}
workspace.PublishDashboardEvent(ctx, e)
}()
log.Println("[TRACE]", "begin DashboardExecutionTree.Execute")
defer log.Println("[TRACE]", "end DashboardExecutionTree.Execute")
if e.GetRunStatus().IsFinished() {
// there must be no nodes to execute
log.Println("[TRACE]", "execution tree already complete")
return
}
// execute synchronously
e.Root.Execute(cancelCtx)
}
// GetRunStatus returns the stats of the Root run
func (e *DashboardExecutionTree) GetRunStatus() dashboardtypes.RunStatus {
return e.Root.GetRunStatus()
}
// SetError sets the error on the Root run
func (e *DashboardExecutionTree) SetError(ctx context.Context, err error) {
e.Root.SetError(ctx, err)
}
// GetName implements DashboardParent
// use mod short name - this will be the root name for all child runs
func (e *DashboardExecutionTree) GetName() string {
return e.workspace.Mod.ShortName
}
// GetParent implements DashboardTreeRun
func (e *DashboardExecutionTree) GetParent() dashboardtypes.DashboardParent {
return nil
}
// GetNodeType implements DashboardTreeRun
func (*DashboardExecutionTree) GetNodeType() string {
panic("should never call for DashboardExecutionTree")
}
func (e *DashboardExecutionTree) SetInputValues(inputValues map[string]any) {
log.Printf("[TRACE] SetInputValues")
e.inputLock.Lock()
defer e.inputLock.Unlock()
// we only support inputs if root is a dashboard (NOT a benchmark)
runtimeDependencyPublisher, ok := e.Root.(RuntimeDependencyPublisher)
if !ok {
// should never happen
log.Printf("[WARN] SetInputValues called but root Dashboard run is not a RuntimeDependencyPublisher: %s", e.Root.GetName())
return
}
for name, value := range inputValues {
log.Printf("[TRACE] DashboardExecutionTree SetInput %s = %v", name, value)
e.inputValues[name] = value
// publish runtime dependency
runtimeDependencyPublisher.PublishRuntimeDependencyValue(name, &dashboardtypes.ResolvedRuntimeDependencyValue{Value: value})
}
}
// ChildCompleteChan implements DashboardParent
func (e *DashboardExecutionTree) ChildCompleteChan() chan dashboardtypes.DashboardTreeRun {
return e.runComplete
}
// ChildStatusChanged implements DashboardParent
func (*DashboardExecutionTree) ChildStatusChanged(context.Context) {}
func (e *DashboardExecutionTree) Cancel() {
// if we have not completed, and already have a cancel function - cancel
if e.GetRunStatus().IsFinished() || e.cancel == nil {
log.Printf("[TRACE] DashboardExecutionTree Cancel NOT cancelling status %s cancel func %p", e.GetRunStatus(), e.cancel)
return
}
log.Printf("[TRACE] DashboardExecutionTree Cancel - calling cancel")
e.cancel()
// if there are any children, wait for the execution to complete
if !e.Root.RunComplete() {
<-e.runComplete
}
log.Printf("[TRACE] DashboardExecutionTree Cancel - all children complete")
}
func (e *DashboardExecutionTree) BuildSnapshotPanels() map[string]dashboardtypes.SnapshotPanel {
// just build from e.runs
res := map[string]dashboardtypes.SnapshotPanel{}
for name, run := range e.runs {
res[name] = run.(dashboardtypes.SnapshotPanel)
// special case handling for check runs
if checkRun, ok := run.(*CheckRun); ok {
checkRunChildren := checkRun.BuildSnapshotPanels(res)
for k, v := range checkRunChildren {
res[k] = v
}
}
}
return res
}
// InputRuntimeDependencies returns the names of all inputs which are runtime dependencies
func (e *DashboardExecutionTree) InputRuntimeDependencies() []string {
var deps = map[string]struct{}{}
for _, r := range e.runs {
if leafRun, ok := r.(*LeafRun); ok {
for _, r := range leafRun.runtimeDependencies {
if r.Dependency.PropertyPath.ItemType == modconfig.BlockTypeInput {
deps[r.Dependency.SourceResourceName()] = struct{}{}
}
}
}
}
return maps.Keys(deps)
}
// GetChildren implements DashboardParent
func (e *DashboardExecutionTree) GetChildren() []dashboardtypes.DashboardTreeRun {
return []dashboardtypes.DashboardTreeRun{e.Root}
}
// ChildrenComplete implements DashboardParent
func (e *DashboardExecutionTree) ChildrenComplete() bool {
return e.Root.RunComplete()
}
// Tactical: Empty implementations of DashboardParent functions
// TODO remove need for this
func (e *DashboardExecutionTree) Initialise(ctx context.Context) {
panic("should never call for DashboardExecutionTree")
}
func (e *DashboardExecutionTree) GetTitle() string {
panic("should never call for DashboardExecutionTree")
}
func (e *DashboardExecutionTree) GetError() error {
panic("should never call for DashboardExecutionTree")
}
func (e *DashboardExecutionTree) SetComplete(ctx context.Context) {
panic("should never call for DashboardExecutionTree")
}
func (e *DashboardExecutionTree) RunComplete() bool {
panic("should never call for DashboardExecutionTree")
}
func (e *DashboardExecutionTree) GetInputsDependingOn(s string) []string {
panic("should never call for DashboardExecutionTree")
}
func (*DashboardExecutionTree) AsTreeNode() *dashboardtypes.SnapshotTreeNode {
panic("should never call for DashboardExecutionTree")
}
func (*DashboardExecutionTree) GetResource() modconfig.DashboardLeafNode {
panic("should never call for DashboardExecutionTree")
}