-
Notifications
You must be signed in to change notification settings - Fork 256
/
dashboard_parent_impl.go
169 lines (142 loc) · 5.47 KB
/
dashboard_parent_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package dashboardexecute
import (
"context"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"log"
"sync"
)
type DashboardParentImpl struct {
DashboardTreeRunImpl
children []dashboardtypes.DashboardTreeRun
childCompleteChan chan dashboardtypes.DashboardTreeRun
// are we blocked by a child run
blockedByChild bool
childStatusLock *sync.Mutex
}
func newDashboardParentImpl(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, run dashboardtypes.DashboardTreeRun, executionTree *DashboardExecutionTree) DashboardParentImpl {
return DashboardParentImpl{
DashboardTreeRunImpl: NewDashboardTreeRunImpl(resource, parent, run, executionTree),
childStatusLock: new(sync.Mutex),
}
}
func (r *DashboardParentImpl) initialiseChildren(ctx context.Context) error {
var errors []error
for _, child := range r.children {
child.Initialise(ctx)
if err := child.GetError(); err != nil {
errors = append(errors, err)
}
}
return error_helpers.CombineErrors(errors...)
}
// GetChildren implements DashboardTreeRun
func (r *DashboardParentImpl) GetChildren() []dashboardtypes.DashboardTreeRun {
return r.children
}
// ChildrenComplete implements DashboardTreeRun
func (r *DashboardParentImpl) ChildrenComplete() bool {
for _, child := range r.children {
if !child.RunComplete() {
log.Printf("[TRACE] %s ChildrenComplete child %s NOT complete state %s", r.Name, child.GetName(), child.GetRunStatus())
return false
}
}
return true
}
func (r *DashboardParentImpl) ChildCompleteChan() chan dashboardtypes.DashboardTreeRun {
return r.childCompleteChan
}
func (r *DashboardParentImpl) createChildCompleteChan() {
// create buffered child complete chan
if childCount := len(r.children); childCount > 0 {
r.childCompleteChan = make(chan dashboardtypes.DashboardTreeRun, childCount)
}
}
// if this leaf run has children (including with runs) execute them asynchronously
func (r *DashboardParentImpl) executeChildrenAsync(ctx context.Context) {
for _, c := range r.children {
go c.Execute(ctx)
}
}
// if this leaf run has with runs execute them asynchronously
func (r *DashboardParentImpl) executeWithsAsync(ctx context.Context) {
for _, c := range r.children {
if c.GetNodeType() == modconfig.BlockTypeWith {
go c.Execute(ctx)
}
}
}
func (r *DashboardParentImpl) waitForChildrenAsync(ctx context.Context) chan error {
log.Printf("[TRACE] %s waitForChildrenAsync", r.Name)
var doneChan = make(chan error)
if len(r.children) == 0 {
log.Printf("[TRACE] %s waitForChildrenAsync - no children so we're done", r.Name)
// if there are no children, return a closed channel so we do not wait
close(doneChan)
return doneChan
}
go func() {
// wait for children to complete
var errors []error
for !(r.ChildrenComplete()) {
completeChild := <-r.childCompleteChan
log.Printf("[TRACE] %s waitForChildrenAsync got child complete for %s", r.Name, completeChild.GetName())
if completeChild.GetRunStatus().IsError() {
errors = append(errors, completeChild.GetError())
log.Printf("[TRACE] %s child %s has error %v", r.Name, completeChild.GetName(), completeChild.GetError())
}
}
log.Printf("[TRACE] %s ALL children and withs complete, errors: %v", r.Name, errors)
// so all children have completed - check for errors
// TODO [node_reuse] format better error https://github.com/turbot/steampipe/issues/2920
err := error_helpers.CombineErrors(errors...)
// if context is cancelled, just return context cancellation error
if ctx.Err() != nil {
err = ctx.Err()
}
doneChan <- err
}()
return doneChan
}
func (r *DashboardParentImpl) ChildStatusChanged(ctx context.Context) {
// this function may be called asyncronously by children
r.childStatusLock.Lock()
defer r.childStatusLock.Unlock()
// if we are currently blocked by a child or we are currently in running state,
// call setRunning() to determine whether any of our children are now blocked
if r.blockedByChild || r.GetRunStatus() == dashboardtypes.RunRunning {
log.Printf("[TRACE] %s ChildStatusChanged - calling setRunning to see if we are still running, status %s blockedByChild %v", r.Name, r.GetRunStatus(), r.blockedByChild)
// try setting our status to running again
r.setRunning(ctx)
}
}
// override DashboardTreeRunImpl) setStatus(
func (r *DashboardParentImpl) setRunning(ctx context.Context) {
// if the run is already complete (for example, canceled), do nothing
if r.GetRunStatus().IsFinished() {
log.Printf("[TRACE] %s setRunning - run already terminated - current state %s - NOT setting running", r.Name, r.GetRunStatus())
return
}
status := dashboardtypes.RunRunning
// if we are trying to set status to running, check if any of our children are blocked,
// and if so set our status to blocked
// if any children are blocked, we are blocked
for _, c := range r.children {
if c.GetRunStatus() == dashboardtypes.RunBlocked {
status = dashboardtypes.RunBlocked
r.blockedByChild = true
break
}
// to get here, no children can be blocked - clear blockedByChild
r.blockedByChild = false
}
// set status if it has changed
if status != r.GetRunStatus() {
log.Printf("[TRACE] %s setRunning - setting state %s, blockedByChild %v", r.Name, status, r.blockedByChild)
r.DashboardTreeRunImpl.setStatus(ctx, status)
} else {
log.Printf("[TRACE] %s setRunning - state unchanged %s, blockedByChild %v", r.Name, status, r.blockedByChild)
}
}