Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 22 additions & 21 deletions controllers/executable_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func ensureExecutableFinalState(
}

// Performs an attempt to start an Executable run.
// Main runner is tried first (Spec.ExecutionType), and if that fails and fallback runners are allowed (Spec.FalbackExecutionTypes),
// Main runner is tried first (Spec.ExecutionType), and if that fails and fallback runners are allowed (Spec.FallbackExecutionTypes),
// they are in the order specified, one at a time.
// This function is called from the reconciliation loop.
func (r *ExecutableReconciler) startExecutable(
Expand Down Expand Up @@ -374,18 +374,20 @@ func (r *ExecutableReconciler) startExecutable(
r.runs.Update(exe.NamespacedName(), ri.RunID, ri.Clone())
}

environmentChanged := noChange
if ri.startupStage == StartupStageCertificateDataReady {
oc := r.computeExecutableEnvironment(ctx, exe, log, ri)
if oc != noChange {
var computed bool
computed, environmentChanged = r.computeExecutableEnvironment(ctx, exe, log, ri)
if !computed {
// Environment is not ready, or an error occurred; report it and let the next reconciliation try again.
return oc
return environmentChanged
}
}

if ri.startupStage >= StartupStageDefaultRunner && len(ri.startResults) == 0 { // Should never happen
log.Error(errors.New("inconsistent Executable startup state"), "No startup results found despite startup stage indicating that a startup attempt was made")
r.setExecutableState(exe, apiv1.ExecutableStateUnknown)
return statusChanged
return statusChanged | environmentChanged
}

var startResult *ExecutableStartResult = nil
Expand All @@ -396,7 +398,7 @@ func (r *ExecutableReconciler) startExecutable(
if ri.startupStage >= StartupStageDefaultRunner && startResult != nil && startResult.ExeState == apiv1.ExecutableStateStarting {
// Run attempt in progress, just make sure the Executable status is up to date.
// When the attempt finishes, OnStartupCompleted() will schedule another reconciliation.
return ri.ApplyTo(exe, log)
return ri.ApplyTo(exe, log) | environmentChanged
}

// Helper function to update Executable status when the start attempt was successful.
Expand All @@ -419,7 +421,7 @@ func (r *ExecutableReconciler) startExecutable(
// So first we need to check the result of the previous startup attempt (if any).
if startResult.IsSuccessfullyCompleted() {
handleSuccessfulStart(startResult)
return statusChanged
return statusChanged | environmentChanged
} else if startResult != nil {
runErr := startResult.StartupError
if runErr != nil {
Expand All @@ -443,7 +445,7 @@ func (r *ExecutableReconciler) startExecutable(
}
exe.Status.FinishTimestamp = exe.Status.StartupTimestamp
r.runs.DeleteByNamespacedName(exe.NamespacedName())
return statusChanged
return statusChanged | environmentChanged
}

ri.startupStage++
Expand Down Expand Up @@ -475,11 +477,11 @@ func (r *ExecutableReconciler) startExecutable(

if startResult.IsSuccessfullyCompleted() {
handleSuccessfulStart(startResult)
return statusChanged
return statusChanged | environmentChanged
}

// Asynchronous start in progress, wait for OnStartupCompleted() to be called
return additionalReconciliationNeeded
return additionalReconciliationNeeded | environmentChanged
}
}

Expand Down Expand Up @@ -985,11 +987,10 @@ func (r *ExecutableReconciler) computeEffectiveInvocationArgs(
}

// Computes effective environment and invocation arguments for the Executable.
// Returned objectChange indicates whether the method was successful or not.
// noChange means environment was computed successfully and we can proceed with startup.
// Any other value means there was an error and the caller should return allow another reconciliation loop iteration.
// Returned value indicates whether the method was successful or not, together with objectChange that indicates
// whether the Executable status was changed and whether another reconciliation iteration is needed.
// This method is called from the reconciliation loop.
func (r *ExecutableReconciler) computeExecutableEnvironment(ctx context.Context, exe *apiv1.Executable, log logr.Logger, ri *ExecutableRunInfo) objectChange {
func (r *ExecutableReconciler) computeExecutableEnvironment(ctx context.Context, exe *apiv1.Executable, log logr.Logger, ri *ExecutableRunInfo) (bool, objectChange) {
// Ports reserved for services that the Executable implements without specifying the desired port to use (via service-producer annotation).
reservedServicePorts := make(map[types.NamespacedName]int32)

Expand All @@ -1005,16 +1006,16 @@ func (r *ExecutableReconciler) computeExecutableEnvironment(ctx context.Context,
log.Info("Executable is being deleted while waiting to compute effective environment, aborting startup",
"Cause", err.Error())
markExecutableFailed()
return statusChanged
return false, statusChanged
} else {
log.Info("Could not compute effective environment for the Executable, retrying startup...",
"Cause", err.Error())
return r.setExecutableState(exe, apiv1.ExecutableStateStarting) | additionalReconciliationNeeded
return false, r.setExecutableState(exe, apiv1.ExecutableStateStarting) | additionalReconciliationNeeded
}
} else if err != nil {
log.Error(err, "Could not compute effective environment for the Executable")
markExecutableFailed()
return statusChanged
return false, statusChanged
}

err = r.computeEffectiveInvocationArgs(ctx, exe, reservedServicePorts, log)
Expand All @@ -1023,16 +1024,16 @@ func (r *ExecutableReconciler) computeExecutableEnvironment(ctx context.Context,
log.Info("Executable is being deleted while waiting to compute effective invocation arguments, aborting startup",
"Cause", err.Error())
markExecutableFailed()
return statusChanged
return false, statusChanged
} else {
log.Info("Could not compute effective invocation arguments for the Executable, retrying startup...",
"Cause", err.Error())
return r.setExecutableState(exe, apiv1.ExecutableStateStarting) | additionalReconciliationNeeded
return false, r.setExecutableState(exe, apiv1.ExecutableStateStarting) | additionalReconciliationNeeded
}
} else if err != nil {
log.Error(err, "Could not compute effective invocation arguments for the Executable")
markExecutableFailed()
return statusChanged
return false, statusChanged
}

if len(reservedServicePorts) > 0 {
Expand All @@ -1044,7 +1045,7 @@ func (r *ExecutableReconciler) computeExecutableEnvironment(ctx context.Context,
ri.ReservedPorts = reservedServicePorts
ri.startupStage = StartupStageDataInitialized
r.runs.Update(exe.NamespacedName(), ri.RunID, ri.Clone())
return noChange
return true, statusChanged // Status.EffectiveEnv and Status.EffectiveArgs were changed
}

func (r *ExecutableReconciler) prepareCertificateFiles(
Expand Down
2 changes: 1 addition & 1 deletion internal/exerunners/process_executable_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func (r *ProcessExecutableRunner) StartRun(
result.StdErrFile = ""
}

runChangeHandler.OnStartupCompleted(exe.NamespacedName(), result)
result.StartupError = startErr
runChangeHandler.OnStartupCompleted(exe.NamespacedName(), result)
return result
} else {
// Use original log here, the watcher is a different process.
Expand Down
110 changes: 110 additions & 0 deletions internal/testutil/ctrlutil/test_process_executable_runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package ctrlutil

import (
"context"
"os/exec"
"sync"
"time"

"github.com/go-logr/logr"

apiv1 "github.com/microsoft/dcp/api/v1"
"github.com/microsoft/dcp/controllers"
"github.com/microsoft/dcp/internal/exerunners"
"github.com/microsoft/dcp/internal/testutil"
"github.com/microsoft/dcp/pkg/process"
"github.com/microsoft/dcp/pkg/slices"
)

// TestProcessExecutableRunner wraps an ExecutableRunner and allows tests to simulate
// asynchronous executable startup by delaying the call to the underlying runner.
type TestProcessExecutableRunner struct {
inner controllers.ExecutableRunner
autoExecutions []testutil.AutoExecution
m *sync.RWMutex
}

func NewTestProcessExecutableRunner(pe process.Executor) *TestProcessExecutableRunner {
return &TestProcessExecutableRunner{
inner: exerunners.NewProcessExecutableRunner(pe),
m: &sync.RWMutex{},
}
}

// InstallAsyncStartConfig adds an async start configuration.
// If a configuration with the same condition already exists, it will be replaced.
func (r *TestProcessExecutableRunner) InstallAutoExecution(ae testutil.AutoExecution) {
r.m.Lock()
defer r.m.Unlock()

// Remove any previous config that matches the same criteria.
withoutExisting := slices.Select(r.autoExecutions, func(existing testutil.AutoExecution) bool {
return !ae.Condition.Equals(&existing.Condition)
})

r.autoExecutions = append(withoutExisting, ae)
}

func (e *TestProcessExecutableRunner) RemoveAutoExecution(sc testutil.ProcessSearchCriteria) {
e.m.Lock()
defer e.m.Unlock()

e.autoExecutions = slices.Select(e.autoExecutions, func(ae testutil.AutoExecution) bool {
return !sc.Equals(&ae.Condition)
})
}

func (r *TestProcessExecutableRunner) StartRun(
ctx context.Context,
exe *apiv1.Executable,
runChangeHandler controllers.RunChangeHandler,
log logr.Logger,
) *controllers.ExecutableStartResult {
r.m.RLock()
cmd := exec.Command(exe.Spec.ExecutablePath)
cmd.Args = append([]string{exe.Spec.ExecutablePath}, exe.Status.EffectiveArgs...)
// Executable spec has other properties (env, working dir) but they are not needed for matching criteria.
i := slices.IndexFunc(r.autoExecutions, func(ae testutil.AutoExecution) bool { return ae.Condition.MatchesCmd(cmd) })
var ae *testutil.AutoExecution
if i != -1 {
ae = &r.autoExecutions[i]
}
r.m.RUnlock()

if ae == nil || ae.AsynchronousStartupDelay == 0 {
return r.inner.StartRun(ctx, exe, runChangeHandler, log)
}

// Start a goroutine to call the underlying runner after the delay.
go func() {
delay := ae.AsynchronousStartupDelay

select {
case <-ctx.Done():
return // Time out before delay elapsed.
case <-time.After(delay):
// Proceed to call the underlying runner.
}

// Call the underlying runner. It will call OnStartupCompleted() on the runChangeHandler.
// We don't need to do anything with the result here--it will be reported by the underlying runner
// via OnStartupCompleted().
_ = r.inner.StartRun(ctx, exe, runChangeHandler, log)
}()

result := controllers.NewExecutableStartResult()
result.ExeState = apiv1.ExecutableStateStarting
return result
}

// StopRun implements ExecutableRunner by delegating to the underlying runner.
func (r *TestProcessExecutableRunner) StopRun(ctx context.Context, runID controllers.RunID, log logr.Logger) error {
return r.inner.StopRun(ctx, runID, log)
}

var _ controllers.ExecutableRunner = (*TestProcessExecutableRunner)(nil)
Loading
Loading