-
Notifications
You must be signed in to change notification settings - Fork 263
/
runtime_dependency_publisher_impl.go
294 lines (262 loc) · 9.79 KB
/
runtime_dependency_publisher_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
package dashboardexecute
import (
"context"
"encoding/json"
"fmt"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
"log"
"strconv"
"sync"
)
type runtimeDependencyPublisherImpl struct {
DashboardParentImpl
Args []any `json:"args,omitempty"`
Params []*modconfig.ParamDef `json:"params,omitempty"`
subscriptions map[string][]*RuntimeDependencyPublishTarget
withValueMutex *sync.Mutex
withRuns map[string]*LeafRun
inputs map[string]*modconfig.DashboardInput
}
func newRuntimeDependencyPublisherImpl(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, run dashboardtypes.DashboardTreeRun, executionTree *DashboardExecutionTree) runtimeDependencyPublisherImpl {
b := runtimeDependencyPublisherImpl{
DashboardParentImpl: newDashboardParentImpl(resource, parent, run, executionTree),
subscriptions: make(map[string][]*RuntimeDependencyPublishTarget),
inputs: make(map[string]*modconfig.DashboardInput),
withRuns: make(map[string]*LeafRun),
withValueMutex: new(sync.Mutex),
}
// if the resource is a query provider, get params and set status
if queryProvider, ok := resource.(modconfig.QueryProvider); ok {
// get params
b.Params = queryProvider.GetParams()
if queryProvider.RequiresExecution(queryProvider) || len(queryProvider.GetChildren()) > 0 {
b.Status = dashboardtypes.RunInitialized
}
}
return b
}
func (p *runtimeDependencyPublisherImpl) Initialise(context.Context) {}
func (p *runtimeDependencyPublisherImpl) Execute(context.Context) {
panic("must be implemented by child struct")
}
func (p *runtimeDependencyPublisherImpl) AsTreeNode() *dashboardtypes.SnapshotTreeNode {
panic("must be implemented by child struct")
}
func (p *runtimeDependencyPublisherImpl) GetName() string {
return p.Name
}
func (p *runtimeDependencyPublisherImpl) ProvidesRuntimeDependency(dependency *modconfig.RuntimeDependency) bool {
resourceName := dependency.SourceResourceName()
switch dependency.PropertyPath.ItemType {
case modconfig.BlockTypeWith:
// we cannot use withRuns here as if withs have dependencies on each other,
// this function may be called before all runs have been added
// instead, look directly at the underlying resource withs
if wp, ok := p.resource.(modconfig.WithProvider); ok {
for _, w := range wp.GetWiths() {
if w.UnqualifiedName == resourceName {
return true
}
}
}
return false
case modconfig.BlockTypeInput:
return p.inputs[resourceName] != nil
case modconfig.BlockTypeParam:
for _, p := range p.Params {
// check short name not resource name (which is unqualified name)
if p.ShortName == dependency.PropertyPath.Name {
return true
}
}
}
return false
}
func (p *runtimeDependencyPublisherImpl) SubscribeToRuntimeDependency(name string, opts ...RuntimeDependencyPublishOption) chan *dashboardtypes.ResolvedRuntimeDependencyValue {
target := &RuntimeDependencyPublishTarget{
// make a channel (buffer to avoid potential sync issues)
channel: make(chan *dashboardtypes.ResolvedRuntimeDependencyValue, 1),
}
for _, o := range opts {
o(target)
}
log.Printf("[TRACE] SubscribeToRuntimeDependency %s", name)
// subscribe, passing a function which invokes getWithValue to resolve the required with value
p.subscriptions[name] = append(p.subscriptions[name], target)
return target.channel
}
func (p *runtimeDependencyPublisherImpl) PublishRuntimeDependencyValue(name string, result *dashboardtypes.ResolvedRuntimeDependencyValue) {
for _, target := range p.subscriptions[name] {
if target.transform != nil {
// careful not to mutate result which may be reused
target.channel <- target.transform(result)
} else {
target.channel <- result
}
close(target.channel)
}
// clear subscriptions
delete(p.subscriptions, name)
}
func (p *runtimeDependencyPublisherImpl) GetWithRuns() map[string]*LeafRun {
return p.withRuns
}
func (p *runtimeDependencyPublisherImpl) initWiths() error {
// if the resource is a runtime dependency provider, create with runs and resolve dependencies
wp, ok := p.resource.(modconfig.WithProvider)
if !ok {
return nil
}
// if we have with blocks, create runs for them
// BEFORE creating child runs, and before adding runtime dependencies
err := p.createWithRuns(wp.GetWiths(), p.executionTree)
if err != nil {
return err
}
return nil
}
// getWithValue accepts the raw with result (dashboardtypes.LeafData) and the property path, and extracts the appropriate data
func (p *runtimeDependencyPublisherImpl) getWithValue(name string, result *dashboardtypes.LeafData, path *modconfig.ParsedPropertyPath) (any, error) {
// get the set of rows which will be used ot generate the return value
rows := result.Rows
/*
You can
reference the whole table with:
with.stuff1
this is equivalent to:
with.stuff1.rows
and
with.stuff1.rows[*]
Rows is a list, and you can index it to get a single row:
with.stuff1.rows[0]
or splat it to get all rows:
with.stuff1.rows[*]
Each row, in turn, contains all the columns, so you can get a single column of a single row:
with.stuff1.rows[0].a
if you splat the row, then you can get an array of a single column from all rows. This would be passed to sql as an array:
with.stuff1.rows[*].a
*/
// with.stuff1 -> PropertyPath will be ""
// with.stuff1.rows -> PropertyPath will be "rows"
// with.stuff1.rows[*] -> PropertyPath will be "rows.*"
// with.stuff1.rows[0] -> PropertyPath will be "rows.0"
// with.stuff1.rows[0].a -> PropertyPath will be "rows.0.a"
const rowsSegment = 0
const rowsIdxSegment = 1
const columnSegment = 2
// second path section MUST be "rows"
if len(path.PropertyPath) > rowsSegment && path.PropertyPath[rowsSegment] != "rows" || len(path.PropertyPath) > (columnSegment+1) {
return nil, fmt.Errorf("reference to with '%s' has invalid property path '%s'", name, path.Original)
}
// if no row is specified assume all
rowIdxStr := "*"
if len(path.PropertyPath) > rowsIdxSegment {
// so there is 3rd part - this will be the row idx (or '*')
rowIdxStr = path.PropertyPath[rowsIdxSegment]
}
var column string
// is a column specified?
if len(path.PropertyPath) > columnSegment {
column = path.PropertyPath[columnSegment]
} else {
if len(result.Columns) > 1 {
// we do not support returning all columns (yet
return nil, fmt.Errorf("reference to with '%s' is returning more than one column - not supported", name)
}
column = result.Columns[0].Name
}
if rowIdxStr == "*" {
return columnValuesFromRows(column, rows)
}
rowIdx, err := strconv.Atoi(rowIdxStr)
if err != nil {
return nil, fmt.Errorf("reference to with '%s' has invalid property path '%s' - cannot parse row idx '%s'", name, path.Original, rowIdxStr)
}
// do we have the requested row
if rowCount := len(rows); rowIdx >= rowCount {
return nil, fmt.Errorf("reference to with '%s' has invalid row index '%d' - %d %s were returned", name, rowIdx, rowCount, utils.Pluralize("row", rowCount))
}
// so we are returning a single row
row := rows[rowIdx]
return row[column], nil
}
func columnValuesFromRows(column string, rows []map[string]any) (any, error) {
if column == "" {
return nil, fmt.Errorf("columnValuesFromRows failed - no column specified")
}
var res = make([]any, len(rows))
for i, row := range rows {
var ok bool
res[i], ok = row[column]
if !ok {
return nil, fmt.Errorf("column %s does not exist", column)
}
}
return res, nil
}
func (p *runtimeDependencyPublisherImpl) setWithValue(w *LeafRun) {
p.withValueMutex.Lock()
defer p.withValueMutex.Unlock()
name := w.resource.GetUnqualifiedName()
// if there was an error, w.Data will be nil and w.error will be non-nil
result := &dashboardtypes.ResolvedRuntimeDependencyValue{Error: w.err}
if w.err == nil {
populateData(w.Data, result)
}
p.PublishRuntimeDependencyValue(name, result)
return
}
func populateData(withData *dashboardtypes.LeafData, result *dashboardtypes.ResolvedRuntimeDependencyValue) {
result.Value = withData
// TACTICAL - is there are any JSON columns convert them back to a JSON string
var jsonColumns []string
for _, c := range withData.Columns {
if c.DataType == "JSONB" || c.DataType == "JSON" {
jsonColumns = append(jsonColumns, c.Name)
}
}
// now convert any json values into a json string
for _, c := range jsonColumns {
for _, row := range withData.Rows {
jsonBytes, err := json.Marshal(row[c])
if err != nil {
// publish result with the error
result.Error = err
result.Value = nil
return
}
row[c] = string(jsonBytes)
}
}
}
func (p *runtimeDependencyPublisherImpl) createWithRuns(withs []*modconfig.DashboardWith, executionTree *DashboardExecutionTree) error {
for _, w := range withs {
// NOTE: set the name of the run to be the scoped name
withRunName := fmt.Sprintf("%s.%s", p.GetName(), w.UnqualifiedName)
withRun, err := NewLeafRun(w, p, executionTree, setName(withRunName))
if err != nil {
return err
}
// set an onComplete function to populate 'with' data
withRun.onComplete = func() { p.setWithValue(withRun) }
p.withRuns[w.UnqualifiedName] = withRun
p.children = append(p.children, withRun)
}
return nil
}
// called when the args are resolved - if anyone is subscribing to the args value, publish
func (p *runtimeDependencyPublisherImpl) argsResolved(args []any) {
// use params to get param names for each arg and then look of subscriber
for i, param := range p.Params {
if i == len(args) {
return
}
// do we have a subscription for this param
if _, ok := p.subscriptions[param.UnqualifiedName]; ok {
p.PublishRuntimeDependencyValue(param.UnqualifiedName, &dashboardtypes.ResolvedRuntimeDependencyValue{Value: args[i]})
}
}
log.Printf("[TRACE] %s: argsResolved", p.Name)
}