/
controller.go
222 lines (184 loc) · 6.68 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
package session
import (
"context"
"fmt"
"os"
"sort"
"strings"
"time"
"github.com/tilt-dev/tilt/pkg/apis"
"github.com/tilt-dev/tilt/internal/engine/buildcontrol"
"github.com/tilt-dev/tilt/pkg/logger"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/tilt-dev/tilt/internal/store"
session "github.com/tilt-dev/tilt/pkg/apis/core/v1alpha1"
)
// Controller summarizes engine state of resources for the active Tilt session (i.e. invocation of up/ci).
//
// Part of the Session spec includes an exit condition, which is evaluated here and reflected on the Session status.
// The engine will react to changes in the status and exit once Done is true, propagating the error if one exists.
//
// While using an apiserver type and updating the corresponding entity in the apiserver itself, this is not currently
// a reconciler due to heavy dependence on engine internals. It's very likely this will look very different once it
// has been converted to a reconciler. (Ideally, there will also be much less special case conversion logic as the data
// models on which this controller depends evolve during migration to apiserver.)
type Controller struct {
pid int64
startTime time.Time
client ctrlclient.Client
// The last status object sent to the server.
lastStatus *session.SessionStatus
// The last session object returned by the server.
// Note that the server may annotate and transform this
// on top of what we sent.
session *session.Session
}
var _ store.Subscriber = &Controller{}
func NewController(cli ctrlclient.Client) *Controller {
return &Controller{
pid: int64(os.Getpid()),
startTime: time.Now(),
client: cli,
}
}
func (c *Controller) OnChange(ctx context.Context, st store.RStore, summary store.ChangeSummary) {
if summary.IsLogOnly() {
return
}
if c.session == nil {
if initialized, err := c.initialize(ctx, st); err != nil {
st.Dispatch(store.NewErrorAction(fmt.Errorf("failed to initialize Session controller: %v", err)))
return
} else if !initialized {
// engine is still starting up, no-op until ready for initialization
return
}
}
newStatus := c.makeLatestStatus(st)
if err := c.handleLatestStatus(ctx, st, newStatus); err != nil {
if strings.Contains(err.Error(), context.Canceled.Error()) {
return
}
logger.Get(ctx).Debugf("failed to update Session status: %v", err)
}
}
func (c *Controller) initialize(ctx context.Context, st store.RStore) (bool, error) {
s := c.makeSession(st)
if s == nil {
return false, nil
}
// TODO(milas): rather than implicitly creating the Session object here, it should
// be created explicitly as part of loading the Tiltfile
if err := c.client.Create(ctx, s); err != nil {
return false, fmt.Errorf("failed to create Session API object: %v", err)
}
c.session = s
return true, nil
}
func (c *Controller) makeSession(st store.RStore) *session.Session {
state := st.RLockState()
defer st.RUnlockState()
// engine hasn't finished initialization - Tiltfile hasn't been loaded yet
if state.TiltfilePath == "" {
return nil
}
s := &session.Session{
ObjectMeta: metav1.ObjectMeta{
Name: "Tiltfile",
},
Spec: session.SessionSpec{
TiltfilePath: state.TiltfilePath,
},
Status: session.SessionStatus{
PID: c.pid,
StartTime: apis.NewMicroTime(c.startTime),
},
}
// currently, manual + CI are the only supported modes; the apiserver will validate this field and reject
// the object on creation if it doesn't conform, so there's no additional validation/error-handling here
switch state.EngineMode {
case store.EngineModeUp:
s.Spec.ExitCondition = session.ExitConditionManual
case store.EngineModeCI:
s.Spec.ExitCondition = session.ExitConditionCI
}
return s
}
func (c *Controller) makeLatestStatus(st store.RStore) *session.SessionStatus {
state := st.RLockState()
defer st.RUnlockState()
status := &session.SessionStatus{
PID: c.pid,
StartTime: apis.NewMicroTime(c.startTime),
}
status.Targets = append(status.Targets, tiltfileTarget(state))
// determine the reason any resources (and thus all of their targets) are waiting (aka "holds")
// N.B. we don't actually care about what's "next" to build, but the info comes alongside that
_, holds := buildcontrol.NextTargetToBuild(state)
for _, mt := range state.ManifestTargets {
status.Targets = append(status.Targets, targetsForResource(mt, holds)...)
}
// ensure consistent ordering to avoid unnecessary updates
sort.SliceStable(status.Targets, func(i, j int) bool {
return status.Targets[i].Name < status.Targets[j].Name
})
processExitCondition(c.session.Spec.ExitCondition, status)
return status
}
func (c *Controller) handleLatestStatus(ctx context.Context, st store.RStore, newStatus *session.SessionStatus) error {
// Use the lastStatus to check for changes, so we don't have to worry
// about server-side changes affecting the equality check.
if equality.Semantic.DeepEqual(c.lastStatus, newStatus) {
return nil
}
c.lastStatus = newStatus.DeepCopy()
// deep copy is made to avoid tainting local version on failure
updated := c.session.DeepCopy()
updated.Status = *newStatus
if err := c.client.Status().Update(ctx, updated); err != nil {
return err
}
c.session = updated
st.Dispatch(NewSessionUpdateStatusAction(updated))
return nil
}
func processExitCondition(exitCondition session.ExitCondition, status *session.SessionStatus) {
if exitCondition == session.ExitConditionManual {
return
} else if exitCondition != session.ExitConditionCI {
status.Done = true
status.Error = fmt.Sprintf("unsupported exit condition: %s", exitCondition)
}
allResourcesOK := true
for _, res := range status.Targets {
if res.State.Waiting == nil && res.State.Active == nil && res.State.Terminated == nil {
// if all states are nil, the target has not been requested to run, e.g. auto_init=False
continue
}
if res.State.Terminated != nil && res.State.Terminated.Error != "" {
status.Done = true
status.Error = res.State.Terminated.Error
return
}
if res.State.Waiting != nil {
allResourcesOK = false
} else if res.State.Active != nil && (!res.State.Active.Ready || res.Type == session.TargetTypeJob) {
// jobs must run to completion
allResourcesOK = false
}
}
// Tiltfile is _always_ a target, so ensure that there's at least one other real target, or it's possible to
// exit before the targets have actually been initialized
if allResourcesOK && len(status.Targets) > 1 {
status.Done = true
}
}
// errToString returns a stringified version of an error or an empty string if the error is nil.
func errToString(err error) string {
if err == nil {
return ""
}
return err.Error()
}