Skip to content

Commit

Permalink
feat: Implement idempotent run v0 (#769)
Browse files Browse the repository at this point in the history
## Description:
This feature saves the current state of the enclave after each starlark
run and tries to optimize the next execution to execute only the
instructions that changed compare to previous run. For this V0, the
optimisation is "append-only". I.e. the resolution of instructions will
succeed only if what changed is at the end of the instruction queue

## Is this change user facing?
NO
<!-- If yes, please add the "user facing" label to the PR -->
<!-- If yes, don't forget to include docs changes where relevant -->

## References (if applicable):
<!-- Add relevant Github Issues, Discord threads, or other helpful
information. -->
  • Loading branch information
Guillaume Bouvignies committed Jun 27, 2023
1 parent 0858f56 commit 23b121f
Show file tree
Hide file tree
Showing 17 changed files with 708 additions and 82 deletions.
Expand Up @@ -45,6 +45,7 @@ func (plan *InstructionsPlan) AddScheduledInstruction(scheduledInstruction *Sche
newScheduledInstructionUuid := scheduledInstruction.uuid
newScheduledInstruction := NewScheduledInstruction(newScheduledInstructionUuid, scheduledInstruction.kurtosisInstruction, scheduledInstruction.returnedValue)
newScheduledInstruction.Executed(scheduledInstruction.IsExecuted())
newScheduledInstruction.ImportedFromCurrentEnclavePlan(scheduledInstruction.IsImportedFromCurrentEnclavePlan())

plan.scheduledInstructionsIndex[newScheduledInstructionUuid] = newScheduledInstruction
plan.instructionsSequence = append(plan.instructionsSequence, newScheduledInstructionUuid)
Expand Down
@@ -0,0 +1,43 @@
package resolver

import "github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/instructions_plan"

type InstructionsPlanMask struct {
readIdx int
scheduledInstructions []*instructions_plan.ScheduledInstruction
isValid bool
}

func NewInstructionsPlanMask(size int) *InstructionsPlanMask {
return &InstructionsPlanMask{
readIdx: 0,
scheduledInstructions: make([]*instructions_plan.ScheduledInstruction, size),
isValid: true, // the mask is considered valid until it's proven to be invalid
}
}

func (mask *InstructionsPlanMask) InsertAt(idx int, instruction *instructions_plan.ScheduledInstruction) {
mask.scheduledInstructions[idx] = instruction
}

func (mask *InstructionsPlanMask) HasNext() bool {
return mask.readIdx < len(mask.scheduledInstructions)
}

func (mask *InstructionsPlanMask) Next() *instructions_plan.ScheduledInstruction {
scheduledInstruction := mask.scheduledInstructions[mask.readIdx]
mask.readIdx += 1
return scheduledInstruction
}

func (mask *InstructionsPlanMask) Size() int {
return len(mask.scheduledInstructions)
}

func (mask *InstructionsPlanMask) MarkAsInvalid() {
mask.isValid = false
}

func (mask *InstructionsPlanMask) IsValid() bool {
return mask.isValid
}
Expand Up @@ -20,14 +20,17 @@ type ScheduledInstruction struct {
returnedValue starlark.Value

executed bool

importedFromCurrentEnclavePlan bool
}

func NewScheduledInstruction(uuid ScheduledInstructionUuid, kurtosisInstruction kurtosis_instruction.KurtosisInstruction, returnedValue starlark.Value) *ScheduledInstruction {
return &ScheduledInstruction{
uuid: uuid,
kurtosisInstruction: kurtosisInstruction,
returnedValue: returnedValue,
executed: false,
uuid: uuid,
kurtosisInstruction: kurtosisInstruction,
returnedValue: returnedValue,
executed: false,
importedFromCurrentEnclavePlan: false,
}
}

Expand All @@ -47,3 +50,12 @@ func (instruction *ScheduledInstruction) Executed(isExecuted bool) *ScheduledIns
func (instruction *ScheduledInstruction) IsExecuted() bool {
return instruction.executed
}

func (instruction *ScheduledInstruction) ImportedFromCurrentEnclavePlan(importedFromCurrentEnclavePlan bool) *ScheduledInstruction {
instruction.importedFromCurrentEnclavePlan = importedFromCurrentEnclavePlan
return instruction
}

func (instruction *ScheduledInstruction) IsImportedFromCurrentEnclavePlan() bool {
return instruction.importedFromCurrentEnclavePlan
}
Expand Up @@ -2,6 +2,7 @@ package plan_module

import (
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/instructions_plan"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/instructions_plan/resolver"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/kurtosis_starlark_framework/kurtosis_plan_instruction"
"go.starlark.net/starlark"
"go.starlark.net/starlarkstruct"
Expand All @@ -13,11 +14,12 @@ const (

func PlanModule(
instructionsPlan *instructions_plan.InstructionsPlan,
instructionsPlanMask *resolver.InstructionsPlanMask,
kurtosisPlanInstructions []*kurtosis_plan_instruction.KurtosisPlanInstruction,
) *starlarkstruct.Module {
moduleBuiltins := starlark.StringDict{}
for _, planInstruction := range kurtosisPlanInstructions {
wrappedPlanInstruction := kurtosis_plan_instruction.NewKurtosisPlanInstructionWrapper(planInstruction, instructionsPlan)
wrappedPlanInstruction := kurtosis_plan_instruction.NewKurtosisPlanInstructionWrapper(planInstruction, instructionsPlanMask, instructionsPlan)
moduleBuiltins[planInstruction.GetName()] = starlark.NewBuiltin(planInstruction.GetName(), wrappedPlanInstruction.CreateBuiltin())
}

Expand Down
Expand Up @@ -2,6 +2,7 @@ package kurtosis_plan_instruction

import (
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/instructions_plan"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/instructions_plan/resolver"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/kurtosis_starlark_framework"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/startosis_errors"
"go.starlark.net/starlark"
Expand All @@ -20,13 +21,16 @@ type KurtosisPlanInstruction struct {
type KurtosisPlanInstructionWrapper struct {
*KurtosisPlanInstruction

instructionPlanMask *resolver.InstructionsPlanMask

// TODO: This can be changed to KurtosisPlanInstructionInternal when we get rid of KurtosisInstruction
instructionsPlan *instructions_plan.InstructionsPlan
}

func NewKurtosisPlanInstructionWrapper(instruction *KurtosisPlanInstruction, instructionsPlan *instructions_plan.InstructionsPlan) *KurtosisPlanInstructionWrapper {
func NewKurtosisPlanInstructionWrapper(instruction *KurtosisPlanInstruction, instructionPlanMask *resolver.InstructionsPlanMask, instructionsPlan *instructions_plan.InstructionsPlan) *KurtosisPlanInstructionWrapper {
return &KurtosisPlanInstructionWrapper{
KurtosisPlanInstruction: instruction,
instructionPlanMask: instructionPlanMask,
instructionsPlan: instructionsPlan,
}
}
Expand All @@ -44,13 +48,34 @@ func (builtin *KurtosisPlanInstructionWrapper) CreateBuiltin() func(thread *star
return nil, interpretationErr
}

// before returning, automatically add instruction to queue
if err := builtin.instructionsPlan.AddInstruction(instructionWrapper, returnedFutureValue); err != nil {
return nil, startosis_errors.WrapWithInterpretationError(err,
"Unable to add Kurtosis instruction '%s' at position '%s' to the current plan being assembled. This is a Kurtosis internal bug",
instructionWrapper.String(),
instructionWrapper.GetPositionInOriginalScript().String())
var instructionPullFromMaskMaybe *instructions_plan.ScheduledInstruction
if builtin.instructionPlanMask.HasNext() {
instructionPullFromMaskMaybe = builtin.instructionPlanMask.Next()
if instructionPullFromMaskMaybe != nil && instructionPullFromMaskMaybe.GetInstruction().String() != instructionWrapper.String() {
// if the instructions differs, then the mask is invalid
builtin.instructionPlanMask.MarkAsInvalid()
// TODO: we could interrupt the interpretation here, because with an invalid mask the list of
// instruction generated will be invalid anyway. Though we currently don't have a nive way to
// interrupt an interpretation in progress (other than by throwing an error, which would be
// misleading here)
// To properly solve that, I think we should switch to an interactive interpretation where we
// interpret each instruction one after the other, and evaluating the state after each step
}
}

if instructionPullFromMaskMaybe != nil {
// If there's a mask for this instruction, add the mask the plan and returned the mask's returned value
builtin.instructionsPlan.AddScheduledInstruction(instructionPullFromMaskMaybe).Executed(true).ImportedFromCurrentEnclavePlan(false)
return instructionPullFromMaskMaybe.GetReturnedValue(), nil
} else {
// otherwise add the instruction as a new one to the plan and return its own returned value
if err := builtin.instructionsPlan.AddInstruction(instructionWrapper, returnedFutureValue); err != nil {
return nil, startosis_errors.WrapWithInterpretationError(err,
"Unable to add Kurtosis instruction '%s' at position '%s' to the current plan being assembled. This is a Kurtosis internal bug",
instructionWrapper.String(),
instructionWrapper.GetPositionInOriginalScript().String())
}
return returnedFutureValue, nil
}
return returnedFutureValue, nil
}
}
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/builtins"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/instructions_plan"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/instructions_plan/resolver"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/kurtosis_starlark_framework/builtin_argument"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/kurtosis_starlark_framework/kurtosis_plan_instruction"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/startosis_constants"
Expand Down Expand Up @@ -79,7 +80,8 @@ func testKurtosisPlanInstruction(t *testing.T, builtin KurtosisPlanInstructionBa
predeclared := getBasePredeclaredDict(t)
// Add the KurtosisPlanInstruction that is being tested
instructionFromBuiltin := builtin.GetInstruction()
instructionWrapper := kurtosis_plan_instruction.NewKurtosisPlanInstructionWrapper(instructionFromBuiltin, instructionsPlan)
emptyInstructionsPlanMask := resolver.NewInstructionsPlanMask(0)
instructionWrapper := kurtosis_plan_instruction.NewKurtosisPlanInstructionWrapper(instructionFromBuiltin, emptyInstructionsPlanMask, instructionsPlan)
predeclared[instructionWrapper.GetName()] = starlark.NewBuiltin(instructionWrapper.GetName(), instructionWrapper.CreateBuiltin())

starlarkCode := builtin.GetStarlarkCode()
Expand Down
Expand Up @@ -59,9 +59,16 @@ func (executor *StartosisExecutor) Execute(ctx context.Context, dryRun bool, par
// TODO: for now the plan is append only, as each Starlark run happens on top of whatever exists in the enclave
logrus.Debugf("Current enclave plan contains %d instuctions. About to process a new plan with %d instructions (dry-run: %v)",
executor.enclavePlan.Size(), len(instructionsSequence), dryRun)

executor.enclavePlan = instructions_plan.NewInstructionsPlan()
totalNumberOfInstructions := uint32(len(instructionsSequence))
for index, scheduledInstruction := range instructionsSequence {
instructionNumber := uint32(index + 1)

if scheduledInstruction.IsImportedFromCurrentEnclavePlan() {
executor.enclavePlan.AddScheduledInstruction(scheduledInstruction).Executed(true).ImportedFromCurrentEnclavePlan(true)
continue
}
progress := binding_constructors.NewStarlarkRunResponseLineFromSinglelineProgressInfo(
progressMsg, instructionNumber, totalNumberOfInstructions)
starlarkRunResponseLineStream <- progress
Expand Down Expand Up @@ -106,6 +113,10 @@ func (executor *StartosisExecutor) Execute(ctx context.Context, dryRun bool, par
return starlarkRunResponseLineStream
}

func (executor *StartosisExecutor) GetCurrentEnclavePLan() *instructions_plan.InstructionsPlan {
return executor.enclavePlan
}

func sendErrorAndFail(starlarkRunResponseLineStream chan<- *kurtosis_core_rpc_api_bindings.StarlarkRunResponseLine, err error, msg string, msgArgs ...interface{}) {
propagatedErr := stacktrace.Propagate(err, msg, msgArgs...)
serializedError := binding_constructors.NewStarlarkExecutionError(propagatedErr.Error())
Expand Down
Expand Up @@ -38,33 +38,39 @@ func TestExecuteKurtosisInstructions_ExecuteForReal_Success(t *testing.T) {

instructionsPlan := instructions_plan.NewInstructionsPlan()
instruction0 := createMockInstruction(t, "instruction0", executeSuccessfully)
scheduledInstruction0 := instructions_plan.NewScheduledInstruction("instruction0", instruction0, starlark.None).Executed(true)
scheduledInstruction0 := instructions_plan.NewScheduledInstruction("instruction0", instruction0, starlark.None).Executed(true).ImportedFromCurrentEnclavePlan(true)
instructionsPlan.AddScheduledInstruction(scheduledInstruction0)

instruction1 := createMockInstruction(t, "instruction1", executeSuccessfully)
scheduledInstruction1 := instructions_plan.NewScheduledInstruction("instruction1", instruction1, starlark.None).Executed(true)
instructionsPlan.AddScheduledInstruction(scheduledInstruction1)

instruction2 := createMockInstruction(t, "instruction2", executeSuccessfully)
require.NoError(t, instructionsPlan.AddInstruction(instruction1, starlark.None))
instruction3 := createMockInstruction(t, "instruction3", executeSuccessfully)
require.NoError(t, instructionsPlan.AddInstruction(instruction2, starlark.None))
require.NoError(t, instructionsPlan.AddInstruction(instruction3, starlark.None))

require.Equal(t, executor.enclavePlan.Size(), 0) // check that the enclave plan is empty prior to execution

_, serializedInstruction, err := executeSynchronously(t, executor, executeForReal, instructionsPlan)
instruction0.AssertNumberOfCalls(t, "GetCanonicalInstruction", 1)
instruction0.AssertNumberOfCalls(t, "Execute", 0) // not executed as it was already executed
instruction0.AssertNumberOfCalls(t, "GetCanonicalInstruction", 0) // skipped directly
instruction0.AssertNumberOfCalls(t, "Execute", 0)
instruction1.AssertNumberOfCalls(t, "GetCanonicalInstruction", 1)
instruction1.AssertNumberOfCalls(t, "Execute", 1)
instruction1.AssertNumberOfCalls(t, "Execute", 0) // not executed as it was already executed
instruction2.AssertNumberOfCalls(t, "GetCanonicalInstruction", 1)
instruction2.AssertNumberOfCalls(t, "Execute", 1)
instruction3.AssertNumberOfCalls(t, "GetCanonicalInstruction", 1)
instruction3.AssertNumberOfCalls(t, "Execute", 1)

require.Nil(t, err)

expectedSerializedInstructions := []*kurtosis_core_rpc_api_bindings.StarlarkInstruction{
binding_constructors.NewStarlarkInstruction(dummyPosition.ToAPIType(), "instruction0", "instruction0()", noInstructionArgsForTesting),
binding_constructors.NewStarlarkInstruction(dummyPosition.ToAPIType(), "instruction1", "instruction1()", noInstructionArgsForTesting),
binding_constructors.NewStarlarkInstruction(dummyPosition.ToAPIType(), "instruction2", "instruction2()", noInstructionArgsForTesting),
binding_constructors.NewStarlarkInstruction(dummyPosition.ToAPIType(), "instruction3", "instruction3()", noInstructionArgsForTesting),
}
require.Equal(t, expectedSerializedInstructions, serializedInstruction)
require.Equal(t, executor.enclavePlan.Size(), 3) // check that the enclave plan now contains the 3 instructions
require.Equal(t, executor.enclavePlan.Size(), 4) // check that the enclave plan now contains the 4 instructions
}

func TestExecuteKurtosisInstructions_ExecuteForReal_FailureHalfWay(t *testing.T) {
Expand Down

0 comments on commit 23b121f

Please sign in to comment.