Skip to content

Commit

Permalink
engine: run local update commands with the reconciler (#4720)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicks committed Jul 1, 2021
1 parent b260740 commit fdfb049
Show file tree
Hide file tree
Showing 18 changed files with 356 additions and 119 deletions.
4 changes: 2 additions & 2 deletions internal/cli/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

169 changes: 138 additions & 31 deletions internal/controllers/core/cmd/controller.go
Expand Up @@ -42,10 +42,15 @@ type Controller struct {
procs map[types.NamespacedName]*currentProcess
proberManager ProberManager
updateCmds map[types.NamespacedName]*Cmd
mu sync.Mutex
client ctrlclient.Client
st store.RStore
clock clockwork.Clock

// Ensures that we're only updating one Cmd Status at a time.
updateMu sync.Mutex

// Ensures that we're only reonciling one Cmd at a time.
reconcileMu sync.Mutex
}

var _ store.TearDowner = &Controller{}
Expand All @@ -65,6 +70,8 @@ func NewController(ctx context.Context, execer Execer, proberManager ProberManag
}

func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
c.reconcileMu.Lock()
defer c.reconcileMu.Unlock()
err := c.reconcile(ctx, req.NamespacedName)
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -92,46 +99,84 @@ func (c *Controller) TearDown(ctx context.Context) {
}
}

// Fetch all the buttons that this object depends on.
func (c *Controller) buttons(ctx context.Context, cmd *v1alpha1.Cmd) (map[string]*v1alpha1.UIButton, error) {
startOn := cmd.Spec.StartOn
if startOn == nil {
return nil, nil
}

result := make(map[string]*v1alpha1.UIButton, len(startOn.UIButtons))
for _, n := range startOn.UIButtons {
b := &UIButton{}
err := c.client.Get(ctx, types.NamespacedName{Name: n}, b)
if err != nil {
return nil, err
}
result[n] = b
}
return result, nil
}

// Fetch all the filewatches that this object depends on.
func (c *Controller) fileWatches(ctx context.Context, cmd *v1alpha1.Cmd) (map[string]*v1alpha1.FileWatch, error) {
restartOn := cmd.Spec.RestartOn
if restartOn == nil {
return nil, nil
}

result := make(map[string]*v1alpha1.FileWatch, len(restartOn.FileWatches))
for _, n := range restartOn.FileWatches {
fw := &v1alpha1.FileWatch{}
err := c.client.Get(ctx, types.NamespacedName{Name: n}, fw)
if err != nil {
return nil, err
}
result[n] = fw
}
return result, nil
}

// Fetch the last time a start was requested from this target's dependencies.
func (c *Controller) lastStartEventTime(ctx context.Context, startOn *StartOnSpec) (time.Time, error) {
func (c *Controller) lastStartEventTime(startOn *StartOnSpec, buttons map[string]*v1alpha1.UIButton) time.Time {
cur := time.Time{}
if startOn == nil {
return cur, nil
return cur
}

for _, bn := range startOn.UIButtons {
b := &UIButton{}
err := c.client.Get(ctx, types.NamespacedName{Name: bn}, b)
if err != nil {
return cur, err
b, ok := buttons[bn]
if !ok {
// ignore missing buttons
continue
}
lastEventTime := b.Status.LastClickedAt
if !lastEventTime.Time.Before(startOn.StartAfter.Time) && lastEventTime.Time.After(cur) {
cur = lastEventTime.Time
}
}
return cur, nil
return cur
}

// Fetch the last time a restart was requested from this target's dependencies.
func (c *Controller) lastRestartEventTime(ctx context.Context, restartOn *RestartOnSpec) (time.Time, error) {
func (c *Controller) lastRestartEventTime(restartOn *RestartOnSpec, fileWatches map[string]*v1alpha1.FileWatch) time.Time {
cur := time.Time{}
if restartOn == nil {
return cur, nil
return cur
}

for _, fwn := range restartOn.FileWatches {
fw := &FileWatch{}
err := c.client.Get(ctx, types.NamespacedName{Name: fwn}, fw)
if err != nil {
return cur, err
fw, ok := fileWatches[fwn]
if !ok {
// ignore missing filewatches
continue
}
lastEventTime := fw.Status.LastEventTime
if lastEventTime.Time.After(cur) {
cur = lastEventTime.Time
}
}
return cur, nil
return cur
}

func (c *Controller) reconcile(ctx context.Context, name types.NamespacedName) error {
Expand All @@ -148,18 +193,30 @@ func (c *Controller) reconcile(ctx context.Context, name types.NamespacedName) e
return nil
}

lastRestartEventTime, err := c.lastRestartEventTime(ctx, cmd.Spec.RestartOn)
if cmd.ObjectMeta.Labels[v1alpha1.LabelOwnerKind] == v1alpha1.LabelOwnerKindTiltfile {
// Until resource dependencies are expressed in the API,
// we can't use reconciliation to deploy Cmd objects
// owned by the Tiltfile.
return nil
}

buttons, err := c.buttons(ctx, cmd)
if err != nil {
return err
}
lastStartEventTime, err := c.lastStartEventTime(ctx, cmd.Spec.StartOn)

fileWatches, err := c.fileWatches(ctx, cmd)
if err != nil {
return err
}

lastRestartEventTime := c.lastRestartEventTime(cmd.Spec.RestartOn, fileWatches)
lastStartEventTime := c.lastStartEventTime(cmd.Spec.StartOn, buttons)

proc, ok := c.procs[name]
// there is already a proc (which may or may not be running) for this cmd
if ok {

// any change to the spec means we should restart the process to pick up the changes
specChanged := !equality.Semantic.DeepEqual(proc.spec, cmd.Spec)
// a new restart event happened
Expand All @@ -170,28 +227,75 @@ func (c *Controller) reconcile(ctx context.Context, name types.NamespacedName) e
if !needsRestart {
return nil
}

// change the process's current number so that any further events received
// by the existing process will be considered out of date
proc.incrProcNum()

c.stop(name)
} else {
startOn := cmd.Spec.StartOn
startOnSpecIsEmpty := startOn == nil || len(startOn.UIButtons) == 0

// cmds with non-empty StartOn specs don't start until triggered
if !startOnSpecIsEmpty && lastStartEventTime.IsZero() {
c.setStatusWaitingOnStartOn(cmd)
return nil
}
}
_ = c.runInternal(ctx, cmd, buttons, fileWatches)
return nil
}

// Forces the command to run now.
//
// This is a hack to get local_resource commands into the API server,
// even though the API server doesn't have a notion of resource deps yet.
//
// Blocks until the command is finished, then returns its status.
func (c *Controller) ForceRun(ctx context.Context, cmd *v1alpha1.Cmd) (*v1alpha1.CmdStatus, error) {
c.reconcileMu.Lock()
doneCh := c.runInternal(ctx, cmd, nil, nil)
c.reconcileMu.Unlock()

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-doneCh:
}

c.updateMu.Lock()
defer c.updateMu.Unlock()

result, ok := c.updateCmds[types.NamespacedName{Name: cmd.Name}]
if !ok {
return nil, fmt.Errorf("internal error: no cmd status")
}

return result.Status.DeepCopy(), nil
}

// Runs the command unconditionally, stopping any currently running command.
//
// The filewatches and buttons are needed for bookkeeping on how the command
// was triggered.
//
// Returns a channel that closes when the Cmd is finished.
func (c *Controller) runInternal(ctx context.Context,
cmd *v1alpha1.Cmd,
buttons map[string]*v1alpha1.UIButton,
fileWatches map[string]*v1alpha1.FileWatch) (doneCh chan struct{}) {
name := types.NamespacedName{Name: cmd.Name}
proc, ok := c.procs[name]
if ok {
// change the process's current number so that any further events received
// by the existing process will be considered out of date
proc.incrProcNum()

c.stop(name)
} else {
proc = &currentProcess{}
c.procs[name] = proc
}

proc.spec = cmd.Spec
proc.isServer = cmd.ObjectMeta.Annotations[local.AnnotationOwnerKind] == "CmdServer"
proc.lastRestartOnEventTime = lastRestartEventTime
proc.lastStartOnEventTime = lastStartEventTime
proc.lastRestartOnEventTime = c.lastRestartEventTime(cmd.Spec.RestartOn, fileWatches)
proc.lastStartOnEventTime = c.lastStartEventTime(cmd.Spec.StartOn, buttons)
ctx, proc.cancelFunc = context.WithCancel(ctx)

c.resetStatus(name, cmd)
Expand Down Expand Up @@ -219,7 +323,10 @@ func (c *Controller) reconcile(ctx context.Context, name types.NamespacedName) e
},
}
}, stillHasSameProcNum)
return nil

proc.doneCh = make(chan struct{})
close(proc.doneCh)
return proc.doneCh
}
proc.probeWorker = probeWorker
}
Expand All @@ -236,7 +343,7 @@ func (c *Controller) reconcile(ctx context.Context, name types.NamespacedName) e

go c.processStatuses(ctx, statusCh, proc, name, startedAt, stillHasSameProcNum)

return nil
return proc.doneCh
}

func (c *Controller) processReadinessProbeStatusChange(ctx context.Context, name types.NamespacedName, stillHasSameProcNum func() bool) probe.StatusChangedFunc {
Expand Down Expand Up @@ -297,8 +404,8 @@ func processReadinessProbeResultLogger(ctx context.Context, stillHasSameProcNum
}

func (c *Controller) resetStatus(name types.NamespacedName, cmd *Cmd) {
c.mu.Lock()
defer c.mu.Unlock()
c.updateMu.Lock()
defer c.updateMu.Unlock()

updated := cmd.DeepCopy()
updated.Status = CmdStatus{
Expand All @@ -321,8 +428,8 @@ func (c *Controller) resetStatus(name types.NamespacedName, cmd *Cmd) {
//
// update() -> a pure function that applies a delta to the status object.
func (c *Controller) updateStatus(name types.NamespacedName, update func(status *CmdStatus), stillHasSameProcNum func() bool) {
c.mu.Lock()
defer c.mu.Unlock()
c.updateMu.Lock()
defer c.updateMu.Unlock()

if !stillHasSameProcNum() {
return
Expand Down
14 changes: 0 additions & 14 deletions internal/controllers/core/cmd/execer_test.go
Expand Up @@ -160,20 +160,6 @@ func TestExecCmd(t *testing.T) {
}
}

func TestExecCmdContext(t *testing.T) {
testCases := execTestCases()

l := logger.NewLogger(logger.NoneLvl, ioutil.Discard)
ctx := logger.WithLogger(context.Background(), l)

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := localexec.ExecCmdContext(ctx, tc.cmd)
assertCommandEqual(t, tc.cmd, c)
})
}
}

type execTestCase struct {
name string
cmd model.Cmd
Expand Down
3 changes: 2 additions & 1 deletion internal/controllers/core/kubernetesapply/reconciler.go
Expand Up @@ -135,7 +135,8 @@ func (r *Reconciler) shouldDeployOnReconcile(
imageMaps map[types.NamespacedName]*v1alpha1.ImageMap) bool {
if ka.ObjectMeta.Labels[v1alpha1.LabelOwnerKind] == v1alpha1.LabelOwnerKindTiltfile {
// Until resource dependencies are expressed in the API,
// we can't use reconciliation to deploy KubernetesApply objects.
// we can't use reconciliation to deploy KubernetesApply objects
// owned by the Tiltfile.
return false
}

Expand Down

0 comments on commit fdfb049

Please sign in to comment.