-
Notifications
You must be signed in to change notification settings - Fork 263
/
runtime_dependency_subscriber_impl.go
460 lines (398 loc) · 16.9 KB
/
runtime_dependency_subscriber_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
package dashboardexecute
import (
"context"
"fmt"
"log"
"sync"
"github.com/turbot/go-kit/helpers"
typehelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"golang.org/x/exp/maps"
)
type RuntimeDependencySubscriberImpl struct {
// all RuntimeDependencySubscribers are also publishers as they have args/params
runtimeDependencyPublisherImpl
// if the underlying resource has a base resource, create a RuntimeDependencySubscriberImpl instance to handle
// generation and publication of runtime depdencies from the base resource
baseDependencySubscriber *RuntimeDependencySubscriberImpl
// map of runtime dependencies, keyed by dependency long name
runtimeDependencies map[string]*dashboardtypes.ResolvedRuntimeDependency
RawSQL string `json:"sql,omitempty"`
executeSQL string
// a list of the (scoped) names of any runtime dependencies that we rely on
RuntimeDependencyNames []string `json:"dependencies,omitempty"`
}
func NewRuntimeDependencySubscriber(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, run dashboardtypes.DashboardTreeRun, executionTree *DashboardExecutionTree) *RuntimeDependencySubscriberImpl {
b := &RuntimeDependencySubscriberImpl{
runtimeDependencies: make(map[string]*dashboardtypes.ResolvedRuntimeDependency),
}
// create RuntimeDependencyPublisherImpl
// (we must create after creating the run as iut requires a ref to the run)
b.runtimeDependencyPublisherImpl = newRuntimeDependencyPublisherImpl(resource, parent, run, executionTree)
return b
}
// GetBaseDependencySubscriber implements RuntimeDependencySubscriber
func (s *RuntimeDependencySubscriberImpl) GetBaseDependencySubscriber() RuntimeDependencySubscriber {
return s.baseDependencySubscriber
}
// if the resource is a runtime dependency provider, create with runs and resolve dependencies
func (s *RuntimeDependencySubscriberImpl) initRuntimeDependencies(executionTree *DashboardExecutionTree) error {
if _, ok := s.resource.(modconfig.RuntimeDependencyProvider); !ok {
return nil
}
// if our underlying resource has a base which has runtime dependencies,
// create a RuntimeDependencySubscriberImpl for it
if err := s.initBaseRuntimeDependencySubscriber(executionTree); err != nil {
return err
}
// call into publisher to start any with runs
if err := s.runtimeDependencyPublisherImpl.initWiths(); err != nil {
return err
}
// resolve any runtime dependencies
return s.resolveRuntimeDependencies()
}
func (s *RuntimeDependencySubscriberImpl) initBaseRuntimeDependencySubscriber(executionTree *DashboardExecutionTree) error {
if base := s.resource.(modconfig.HclResource).GetBase(); base != nil {
if _, ok := base.(modconfig.RuntimeDependencyProvider); ok {
// create base dependency subscriber
// pass ourselves as 'run'
// - this is only used when sending update events, which will not happen for the baseDependencySubscriber
s.baseDependencySubscriber = NewRuntimeDependencySubscriber(base.(modconfig.DashboardLeafNode), nil, s, executionTree)
err := s.baseDependencySubscriber.initRuntimeDependencies(executionTree)
if err != nil {
return err
}
// create buffered channel for base with to report their completion
s.baseDependencySubscriber.createChildCompleteChan()
}
}
return nil
}
// if this node has runtime dependencies, find the publisher of the dependency and create a dashboardtypes.ResolvedRuntimeDependency
// which we use to resolve the values
func (s *RuntimeDependencySubscriberImpl) resolveRuntimeDependencies() error {
rdp, ok := s.resource.(modconfig.RuntimeDependencyProvider)
if !ok {
return nil
}
runtimeDependencies := rdp.GetRuntimeDependencies()
for n, d := range runtimeDependencies {
// find a runtime dependency publisher who can provider this runtime dependency
publisher := s.findRuntimeDependencyPublisher(d)
if publisher == nil {
// should never happen as validation should have caught this
return fmt.Errorf("cannot resolve runtime dependency %s", d.String())
}
// read name and dep into local loop vars to ensure correct value used when transform func is invoked
name := n
dep := d
// determine the function to use to retrieve the runtime dependency value
var opts []RuntimeDependencyPublishOption
switch dep.PropertyPath.ItemType {
case modconfig.BlockTypeWith:
// set a transform function to extract the requested with data
opts = append(opts, WithTransform(func(resolvedVal *dashboardtypes.ResolvedRuntimeDependencyValue) *dashboardtypes.ResolvedRuntimeDependencyValue {
transformedResolvedVal := &dashboardtypes.ResolvedRuntimeDependencyValue{Error: resolvedVal.Error}
if resolvedVal.Error == nil {
// the runtime dependency value for a 'with' is *dashboardtypes.LeafData
withValue, err := s.getWithValue(name, resolvedVal.Value.(*dashboardtypes.LeafData), dep.PropertyPath)
if err != nil {
transformedResolvedVal.Error = fmt.Errorf("failed to resolve with value '%s' for %s: %s", dep.PropertyPath.Original, name, err.Error())
} else {
transformedResolvedVal.Value = withValue
}
}
return transformedResolvedVal
}))
}
// subscribe, passing a function which invokes getWithValue to resolve the required with value
valueChannel := publisher.SubscribeToRuntimeDependency(d.SourceResourceName(), opts...)
publisherName := publisher.GetName()
s.runtimeDependencies[name] = dashboardtypes.NewResolvedRuntimeDependency(dep, valueChannel, publisherName)
}
return nil
}
func (s *RuntimeDependencySubscriberImpl) findRuntimeDependencyPublisher(runtimeDependency *modconfig.RuntimeDependency) RuntimeDependencyPublisher {
// the runtime dependency publisher is either the root dashboard run,
// or if this resource (or in case of a node/edge, the resource parent) has a base,
// the baseDependencySubscriber for that base
var subscriber RuntimeDependencySubscriber = s
if s.NodeType == modconfig.BlockTypeNode || s.NodeType == modconfig.BlockTypeEdge {
subscriber = s.parent.(RuntimeDependencySubscriber)
}
baseSubscriber := subscriber.GetBaseDependencySubscriber()
// "if I have a base with runtime dependencies, those dependencies must be provided BY THE BASE"
// check the provider property on the runtime dependency
// - if the matches the underlying resource for the baseDependencySubscriber,
// then baseDependencySubscriber _should_ be the dependency publisher
if !helpers.IsNil(baseSubscriber) && runtimeDependency.Provider == baseSubscriber.GetResource() {
if baseSubscriber.ProvidesRuntimeDependency(runtimeDependency) {
return baseSubscriber
}
// unexpected
log.Printf("[WARN] dependency %s has a dependency provider matching the base resource %s but the BaseDependencySubscriber does not provider the runtime dependency",
runtimeDependency.String(), baseSubscriber.GetName())
return nil
}
// "if I am a base resource with runtime dependencies, I provide my own dependencies"
// see if we can satisfy the dependency (this would occur when initialising the baseDependencySubscriber)
if s.ProvidesRuntimeDependency(runtimeDependency) {
return s
}
// "if I am a nested resource, my dashboard provides my dependencies"
// otherwise the dashboard run must be the publisher
dashboardRun := s.executionTree.runs[s.DashboardName].(RuntimeDependencyPublisher)
if dashboardRun.ProvidesRuntimeDependency(runtimeDependency) {
return dashboardRun
}
return nil
}
func (s *RuntimeDependencySubscriberImpl) evaluateRuntimeDependencies(ctx context.Context) error {
log.Printf("[TRACE] %s: evaluateRuntimeDependencies", s.Name)
// now wait for any runtime dependencies then resolve args and params
// (it is possible to have params but no sql)
if s.hasRuntimeDependencies() {
// if there are any unresolved runtime dependencies, wait for them
if err := s.waitForRuntimeDependencies(ctx); err != nil {
return err
}
log.Printf("[TRACE] %s: runtime dependencies availablem resolving sql and args", s.Name)
// ok now we have runtime dependencies, we can resolve the query
if err := s.resolveSQLAndArgs(); err != nil {
return err
}
// call the argsResolved callback in case anyone is waiting for the args
s.argsResolved(s.Args)
}
return nil
}
func (s *RuntimeDependencySubscriberImpl) waitForRuntimeDependencies(ctx context.Context) error {
log.Printf("[TRACE] %s: waitForRuntimeDependencies", s.Name)
if !s.hasRuntimeDependencies() {
log.Printf("[TRACE] %s: no runtime dependencies", s.Name)
return nil
}
// wait for base dependencies if we have any
if s.baseDependencySubscriber != nil {
log.Printf("[TRACE] %s: calling baseDependencySubscriber.waitForRuntimeDependencies", s.Name)
if err := s.baseDependencySubscriber.waitForRuntimeDependencies(ctx); err != nil {
return err
}
}
log.Printf("[TRACE] %s: checking whether all depdencies are resolved", s.Name)
allRuntimeDepsResolved := true
for _, dep := range s.runtimeDependencies {
if !dep.IsResolved() {
allRuntimeDepsResolved = false
log.Printf("[TRACE] %s: dependency %s is NOT resolved", s.Name, dep.Dependency.String())
}
}
if allRuntimeDepsResolved {
return nil
}
log.Printf("[TRACE] %s: BLOCKED", s.Name)
// set status to blocked
s.setStatus(ctx, dashboardtypes.RunBlocked)
var wg sync.WaitGroup
var errChan = make(chan error)
var doneChan = make(chan struct{})
for _, r := range s.runtimeDependencies {
if !r.IsResolved() {
// make copy of loop var for goroutine
resolvedDependency := r
log.Printf("[TRACE] %s: wait for %s", s.Name, resolvedDependency.Dependency.String())
wg.Add(1)
go func() {
defer wg.Done()
// block until the dependency is available
err := resolvedDependency.Resolve()
log.Printf("[TRACE] %s: Resolve returned for %s", s.Name, resolvedDependency.Dependency.String())
if err != nil {
log.Printf("[TRACE] %s: Resolve for %s returned error:L %s", s.Name, resolvedDependency.Dependency.String(), err.Error())
errChan <- err
}
}()
}
}
go func() {
log.Printf("[TRACE] %s: goroutine waiting for all runtime deps to be available", s.Name)
wg.Wait()
close(doneChan)
}()
var errors []error
wait_loop:
for {
select {
case err := <-errChan:
errors = append(errors, err)
case <-doneChan:
break wait_loop
case <-ctx.Done():
errors = append(errors, ctx.Err())
break wait_loop
}
}
log.Printf("[TRACE] %s: all runtime dependencies ready", s.resource.Name())
return error_helpers.CombineErrors(errors...)
}
func (s *RuntimeDependencySubscriberImpl) findRuntimeDependenciesForParentProperty(parentProperty string) []*dashboardtypes.ResolvedRuntimeDependency {
var res []*dashboardtypes.ResolvedRuntimeDependency
for _, dep := range s.runtimeDependencies {
if dep.Dependency.ParentPropertyName == parentProperty {
res = append(res, dep)
}
}
// also look at base subscriber
if s.baseDependencySubscriber != nil {
for _, dep := range s.baseDependencySubscriber.runtimeDependencies {
if dep.Dependency.ParentPropertyName == parentProperty {
res = append(res, dep)
}
}
}
return res
}
func (s *RuntimeDependencySubscriberImpl) findRuntimeDependencyForParentProperty(parentProperty string) *dashboardtypes.ResolvedRuntimeDependency {
res := s.findRuntimeDependenciesForParentProperty(parentProperty)
if len(res) > 1 {
panic(fmt.Sprintf("findRuntimeDependencyForParentProperty for %s, parent property %s, returned more that 1 result", s.Name, parentProperty))
}
if res == nil {
return nil
}
// return first result
return res[0]
}
// resolve the sql for this leaf run into the source sql and resolved args
func (s *RuntimeDependencySubscriberImpl) resolveSQLAndArgs() error {
log.Printf("[TRACE] %s: resolveSQLAndArgs", s.resource.Name())
queryProvider, ok := s.resource.(modconfig.QueryProvider)
if !ok {
// not a query provider - nothing to do
return nil
}
// convert arg runtime dependencies into arg map
runtimeArgs, err := s.buildRuntimeDependencyArgs()
if err != nil {
log.Printf("[TRACE] %s: buildRuntimeDependencyArgs failed: %s", s.resource.Name(), err.Error())
return err
}
// now if any param defaults had runtime dependencies, populate them
s.populateParamDefaults(queryProvider)
log.Printf("[TRACE] %s: built runtime args: %v", s.resource.Name(), runtimeArgs)
// does this leaf run have any SQL to execute?
if queryProvider.RequiresExecution(queryProvider) {
log.Printf("[TRACE] ResolveArgsFromQueryProvider for %s", queryProvider.Name())
resolvedQuery, err := s.executionTree.workspace.ResolveQueryFromQueryProvider(queryProvider, runtimeArgs)
if err != nil {
return err
}
s.RawSQL = resolvedQuery.RawSQL
s.executeSQL = resolvedQuery.ExecuteSQL
s.Args = resolvedQuery.Args
} else {
// otherwise just resolve the args
// merge the base args with the runtime args
runtimeArgs, err = modconfig.MergeArgs(queryProvider, runtimeArgs)
if err != nil {
return err
}
args, err := modconfig.ResolveArgs(queryProvider, runtimeArgs)
if err != nil {
return err
}
s.Args = args
}
return nil
}
func (s *RuntimeDependencySubscriberImpl) populateParamDefaults(provider modconfig.QueryProvider) {
paramDefs := provider.GetParams()
for _, paramDef := range paramDefs {
if dep := s.findRuntimeDependencyForParentProperty(paramDef.UnqualifiedName); dep != nil {
// assuming the default property is the target, set the default
if typehelpers.SafeString(dep.Dependency.TargetPropertyName) == "default" {
paramDef.SetDefault(dep.Value)
}
}
}
}
// convert runtime dependencies into arg map
func (s *RuntimeDependencySubscriberImpl) buildRuntimeDependencyArgs() (*modconfig.QueryArgs, error) {
res := modconfig.NewQueryArgs()
log.Printf("[TRACE] %s: buildRuntimeDependencyArgs - %d runtime dependencies", s.resource.Name(), len(s.runtimeDependencies))
// if the runtime dependencies use position args, get the max index and ensure the args array is large enough
maxArgIndex := -1
// build list of all args runtime dependencies
argRuntimeDependencies := s.findRuntimeDependenciesForParentProperty(modconfig.AttributeArgs)
for _, dep := range argRuntimeDependencies {
if dep.Dependency.TargetPropertyIndex != nil && *dep.Dependency.TargetPropertyIndex > maxArgIndex {
maxArgIndex = *dep.Dependency.TargetPropertyIndex
}
}
if maxArgIndex != -1 {
res.ArgList = make([]*string, maxArgIndex+1)
}
// now set the arg values
for _, dep := range argRuntimeDependencies {
if dep.Dependency.TargetPropertyName != nil {
err := res.SetNamedArgVal(dep.Value, *dep.Dependency.TargetPropertyName)
if err != nil {
return nil, err
}
} else {
if dep.Dependency.TargetPropertyIndex == nil {
return nil, fmt.Errorf("invalid runtime dependency - both ArgName and ArgIndex are nil ")
}
err := res.SetPositionalArgVal(dep.Value, *dep.Dependency.TargetPropertyIndex)
if err != nil {
return nil, err
}
}
}
return res, nil
}
// populate the list of runtime dependencies that this run depends on
func (s *RuntimeDependencySubscriberImpl) setRuntimeDependencies() {
names := make(map[string]struct{}, len(s.runtimeDependencies))
for _, d := range s.runtimeDependencies {
// add to DependencyWiths using ScopedName, i.e. <parent FullName>.<with UnqualifiedName>.
// we do this as there may be a with from a base resource with a clashing with name
// NOTE: this must be consistent with the naming in RuntimeDependencyPublisherImpl.createWithRuns
names[d.ScopedName()] = struct{}{}
}
// get base runtime dependencies (if any)
if s.baseDependencySubscriber != nil {
s.baseDependencySubscriber.setRuntimeDependencies()
s.RuntimeDependencyNames = append(s.RuntimeDependencyNames, s.baseDependencySubscriber.RuntimeDependencyNames...)
}
s.RuntimeDependencyNames = maps.Keys(names)
}
func (s *RuntimeDependencySubscriberImpl) hasRuntimeDependencies() bool {
return len(s.runtimeDependencies)+len(s.baseRuntimeDependencies()) > 0
}
func (s *RuntimeDependencySubscriberImpl) baseRuntimeDependencies() map[string]*dashboardtypes.ResolvedRuntimeDependency {
if s.baseDependencySubscriber == nil {
return map[string]*dashboardtypes.ResolvedRuntimeDependency{}
}
return s.baseDependencySubscriber.runtimeDependencies
}
// override DashboardParentImpl.executeChildrenAsync to also execute 'withs' of our baseRun
func (s *RuntimeDependencySubscriberImpl) executeChildrenAsync(ctx context.Context) {
// if we have a baseDependencySubscriber, execute it
if s.baseDependencySubscriber != nil {
go s.baseDependencySubscriber.executeWithsAsync(ctx)
}
// if this leaf run has children (including with runs) execute them asynchronously
// set RuntimeDependenciesOnly if needed
s.DashboardParentImpl.executeChildrenAsync(ctx)
}
// called when the args are resolved - if anyone is subscribing to the args value, publish
func (s *RuntimeDependencySubscriberImpl) argsResolved(args []any) {
if s.baseDependencySubscriber != nil {
s.baseDependencySubscriber.argsResolved(args)
}
s.runtimeDependencyPublisherImpl.argsResolved(args)
}