-
Notifications
You must be signed in to change notification settings - Fork 263
/
executor.go
223 lines (188 loc) · 6.55 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package dashboardexecute
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"
filehelpers "github.com/turbot/go-kit/files"
"github.com/turbot/steampipe/pkg/dashboard/dashboardevents"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/utils"
"github.com/turbot/steampipe/pkg/workspace"
)
type DashboardExecutor struct {
// map of executions, keyed by session id
executions map[string]*DashboardExecutionTree
executionLock sync.Mutex
// is this an interactive execution
// i.e. inputs may be specified _after_ execution starts
// false when running a single dashboard in batch mode
interactive bool
}
func newDashboardExecutor() *DashboardExecutor {
return &DashboardExecutor{
executions: make(map[string]*DashboardExecutionTree),
// default to interactive execution
interactive: true,
}
}
var Executor = newDashboardExecutor()
func (e *DashboardExecutor) ExecuteDashboard(ctx context.Context, sessionId, dashboardName string, inputs map[string]any, workspace *workspace.Workspace, client db_common.Client) (err error) {
var executionTree *DashboardExecutionTree
defer func() {
if err != nil && ctx.Err() != nil {
err = ctx.Err()
}
// if there was an error executing, send an ExecutionError event
if err != nil {
errorEvent := &dashboardevents.ExecutionError{
Error: err,
Session: sessionId,
Timestamp: time.Now(),
}
workspace.PublishDashboardEvent(ctx, errorEvent)
}
}()
// reset any existing executions for this session
e.CancelExecutionForSession(ctx, sessionId)
// now create a new execution
executionTree, err = NewDashboardExecutionTree(dashboardName, sessionId, client, workspace)
if err != nil {
return err
}
// if inputs must be provided before execution (i.e. this is a batch dashboard execution),
// verify all required inputs are provided
if err = e.validateInputs(executionTree, inputs); err != nil {
return err
}
// add to execution map
e.setExecution(sessionId, executionTree)
// if inputs have been passed, set them first
if len(inputs) > 0 {
executionTree.SetInputValues(inputs)
}
go executionTree.Execute(ctx)
return nil
}
// if inputs must be provided before execution (i.e. this is a batch dashboard execution),
// verify all required inputs are provided
func (e *DashboardExecutor) validateInputs(executionTree *DashboardExecutionTree, inputs map[string]any) error {
if e.interactive {
// interactive dashboard execution - no need to validate
return nil
}
var missingInputs []string
for _, inputName := range executionTree.InputRuntimeDependencies() {
if _, ok := inputs[inputName]; !ok {
missingInputs = append(missingInputs, inputName)
}
}
if missingCount := len(missingInputs); missingCount > 0 {
return fmt.Errorf("%s '%s' must be provided using '--dashboard-input name=value'", utils.Pluralize("input", missingCount), strings.Join(missingInputs, ","))
}
return nil
}
func (e *DashboardExecutor) LoadSnapshot(ctx context.Context, sessionId, snapshotName string, w *workspace.Workspace) (map[string]any, error) {
// find snapshot path in workspace
snapshotPath, ok := w.GetResourceMaps().Snapshots[snapshotName]
if !ok {
return nil, fmt.Errorf("snapshot %s not found in %s (%s)", snapshotName, w.Mod.Name(), w.Path)
}
if !filehelpers.FileExists(snapshotPath) {
return nil, fmt.Errorf("snapshot %s not does not exist", snapshotPath)
}
snapshotContent, err := os.ReadFile(snapshotPath)
if err != nil {
return nil, err
}
// deserialize the snapshot as an interface map
// we cannot deserialize into a SteampipeSnapshot struct
// (without custom derserialisation code) as the Panels property is an interface
snap := map[string]any{}
err = json.Unmarshal(snapshotContent, &snap)
if err != nil {
return nil, err
}
return snap, nil
}
func (e *DashboardExecutor) OnInputChanged(ctx context.Context, sessionId string, inputs map[string]any, changedInput string) error {
// find the execution
executionTree, found := e.executions[sessionId]
if !found {
return fmt.Errorf("no dashboard running for session %s", sessionId)
}
// get the previous value of this input
inputPrevValue := executionTree.inputValues[changedInput]
// first see if any other inputs rely on the one which was just changed
clearedInputs := e.clearDependentInputs(executionTree.Root, changedInput, inputs)
if len(clearedInputs) > 0 {
event := &dashboardevents.InputValuesCleared{
ClearedInputs: clearedInputs,
Session: executionTree.sessionId,
ExecutionId: executionTree.id,
}
executionTree.workspace.PublishDashboardEvent(ctx, event)
}
// if there are any dependent inputs, set their value to nil and send an event to the UI
// if the dashboard run is complete, just re-execute
if executionTree.GetRunStatus().IsFinished() || inputPrevValue != nil {
return e.ExecuteDashboard(
ctx,
sessionId,
executionTree.dashboardName,
inputs,
executionTree.workspace,
executionTree.client)
}
// set the inputs
executionTree.SetInputValues(inputs)
return nil
}
func (e *DashboardExecutor) clearDependentInputs(root dashboardtypes.DashboardTreeRun, changedInput string, inputs map[string]any) []string {
dependentInputs := root.GetInputsDependingOn(changedInput)
clearedInputs := dependentInputs
if len(dependentInputs) > 0 {
for _, inputName := range dependentInputs {
if inputs[inputName] != nil {
// clear the input value
inputs[inputName] = nil
childDependentInputs := e.clearDependentInputs(root, inputName, inputs)
clearedInputs = append(clearedInputs, childDependentInputs...)
}
}
}
return clearedInputs
}
func (e *DashboardExecutor) CancelExecutionForSession(_ context.Context, sessionId string) {
// find the execution
executionTree, found := e.getExecution(sessionId)
if !found {
// nothing to do
return
}
// cancel if in progress
executionTree.Cancel()
// remove from execution tree
e.removeExecution(sessionId)
}
// find the execution for the given session id
func (e *DashboardExecutor) getExecution(sessionId string) (*DashboardExecutionTree, bool) {
e.executionLock.Lock()
defer e.executionLock.Unlock()
executionTree, found := e.executions[sessionId]
return executionTree, found
}
func (e *DashboardExecutor) setExecution(sessionId string, executionTree *DashboardExecutionTree) {
e.executionLock.Lock()
defer e.executionLock.Unlock()
e.executions[sessionId] = executionTree
}
func (e *DashboardExecutor) removeExecution(sessionId string) {
e.executionLock.Lock()
defer e.executionLock.Unlock()
delete(e.executions, sessionId)
}