Skip to content

Commit

Permalink
store future references from tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
tedim52 committed Mar 1, 2024
1 parent 745d695 commit ccc256b
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 90 deletions.
34 changes: 12 additions & 22 deletions core/server/api_container/server/startosis_engine/plan.yml
@@ -1,27 +1,17 @@
packageId: github.com/kurtosis-tech/plan-yaml-prac
services:
- uuid: "1"
name: db
image:
name: postgres:alpine
envVars:
- key: POSTGRES_USER
value: tedi
- key: POSTGRES_PASSWORD
value: tedi
- key: POSTGRES_DB
value: tedi
- uuid: "2"
name: blob-spammer
name: database
image:
name: ethpandaops/tx-fuzz:master
name: postgres:latest
envVars:
- key: VAR_1
value: '{{ kurtosis.1.output }}'
- key: VAR_2
value: '{{ kurtosis.1.code }}'
tasks:
- uuid: "1"
taskType: sh
command:
- apk update && apk add curl jq && current_epoch=$(curl -s http://{{ kurtosis.1.ip_address
}}:5/eth/v2/beacon/blocks/head | jq -r ".version") && echo $current_epoch && while
[ $current_epoch != "deneb" ]; do echo "waiting for deneb, current epoch is $current_epoch";
current_epoch=$(curl -s http://{{ kurtosis.1.ip_address }}:5/eth/v2/beacon/blocks/head
| jq -r ".version"); sleep 1; done && echo "sleep is over, starting to send blob
transactions" && /tx-fuzz.bin blobs --rpc={{ kurtosis.1.ip_address }} --sk=["0x12"]
entrypoint:
- /bin/sh
- -c
- echo some stuff
image: badouralix/curl-jq
Expand Up @@ -401,16 +401,36 @@ func (pyg *PlanYamlGeneratorImpl) updatePlanYamlFromRenderTemplates(renderTempla

func (pyg *PlanYamlGeneratorImpl) updatePlanYamlFromRunSh(runShInstruction *instructions_plan.ScheduledInstruction) error {
var task *Task
var interpErr *startosis_errors.InterpretationError
taskUuid := pyg.generateUuid()

// store run sh future references
returnValue := runShInstruction.GetReturnedValue()
_, ok := returnValue.(*starlarkstruct.Struct)
runShStruct, ok := returnValue.(*starlarkstruct.Struct)
if !ok {
return stacktrace.NewError("Cast to service didn't work")
}
starlarkCodeVal, err := runShStruct.Attr("code")
if err != nil {
return err
}
starlarkCodeFutureRefStr, interpErr := kurtosis_types.SafeCastToString(starlarkCodeVal, "run sh code")
if interpErr != nil {
return interpErr
}
pyg.futureReferenceIndex[starlarkCodeFutureRefStr] = fmt.Sprintf("{{ kurtosis.%v.code }}", taskUuid)
starlarkOutputVal, err := runShStruct.Attr("output")
if err != nil {
return err
}
starlarkOutputFutureRefStr, interpErr := kurtosis_types.SafeCastToString(starlarkOutputVal, "run sh code")
if interpErr != nil {
return interpErr
}
pyg.futureReferenceIndex[starlarkOutputFutureRefStr] = fmt.Sprintf("{{ kurtosis.%v.output }}", taskUuid)

task = &Task{ //nolint:exhaustruct
Uuid: strconv.Itoa(pyg.generateUuid()),
Uuid: strconv.Itoa(taskUuid),
TaskType: SHELL,
}

Expand Down Expand Up @@ -533,11 +553,35 @@ func (pyg *PlanYamlGeneratorImpl) updatePlanYamlFromRunSh(runShInstruction *inst

func (pyg *PlanYamlGeneratorImpl) updatePlanYamlFromRunPython(runPythonInstruction *instructions_plan.ScheduledInstruction) error {
var task *Task
taskUuid := pyg.generateUuid()

// store future references
returnValue := runPythonInstruction.GetReturnedValue()
runPythonStruct, ok := returnValue.(*starlarkstruct.Struct)
if !ok {
return stacktrace.NewError("Cast to service didn't work")
}
starlarkCodeVal, err := runPythonStruct.Attr("code")
if err != nil {
return err
}
starlarkCodeFutureRefStr, interpErr := kurtosis_types.SafeCastToString(starlarkCodeVal, "run python code")
if interpErr != nil {
return interpErr
}
pyg.futureReferenceIndex[starlarkCodeFutureRefStr] = fmt.Sprintf("{{ kurtosis.%v.code }}", taskUuid)
starlarkOutputVal, err := runPythonStruct.Attr("output")
if err != nil {
return err
}
starlarkOutputFutureRefStr, interpErr := kurtosis_types.SafeCastToString(starlarkOutputVal, "run python output")
if interpErr != nil {
return interpErr
}
pyg.futureReferenceIndex[starlarkOutputFutureRefStr] = fmt.Sprintf("{{ kurtosis.%v.output }}", taskUuid)

task = &Task{ //nolint:exhaustruct
Uuid: strconv.Itoa(pyg.generateUuid()),
Uuid: strconv.Itoa(taskUuid),
TaskType: PYTHON,
}

Expand Down Expand Up @@ -700,16 +744,6 @@ func (pyg *PlanYamlGeneratorImpl) updatePlanYamlFromStoreServiceFiles(storeServi
}

arguments := storeServiceFilesInstruction.GetInstruction().GetArguments()
// set the uuid to be the uuid of the service that this files artifact comes from
//serviceName, err := builtin_argument.ExtractArgumentValue[starlark.String](arguments, store_service_files.ServiceNameArgName)
//if err != nil {
// return startosis_errors.WrapWithInterpretationError(err, "Unable to extract value for '%s' argument", store_service_files.ServiceNameArgName)
//}
//if service, ok := pyg.serviceIndex[serviceName.GoString()]; !ok {
// return startosis_errors.NewInterpretationError("A service that hasn't been tracked was found on a store service instruction.")
//} else {
// filesArtifact.Uuid = service.Uuid
//}

// parse for files
src, err := builtin_argument.ExtractArgumentValue[starlark.String](arguments, store_service_files.SrcArgName)
Expand All @@ -727,6 +761,32 @@ func (pyg *PlanYamlGeneratorImpl) updatePlanYamlFromStoreServiceFiles(storeServi
func (pyg *PlanYamlGeneratorImpl) updatePlanYamlFromExec(execInstruction *instructions_plan.ScheduledInstruction) error {
// TODO: update the plan yaml based on an add_service
var task *Task
taskUuid := pyg.generateUuid()

// store future references
returnValue := execInstruction.GetReturnedValue()
execStruct, ok := returnValue.(*starlarkstruct.Struct)
if !ok {
return stacktrace.NewError("Cast to service didn't work")
}
starlarkCodeVal, err := execStruct.Attr("code")
if err != nil {
return err
}
starlarkCodeFutureRefStr, interpErr := kurtosis_types.SafeCastToString(starlarkCodeVal, "exec code")
if interpErr != nil {
return interpErr
}
pyg.futureReferenceIndex[starlarkCodeFutureRefStr] = fmt.Sprintf("{{ kurtosis.%v.code }}", taskUuid)
starlarkOutputVal, err := execStruct.Attr("output")
if err != nil {
return err
}
starlarkOutputFutureRefStr, interpErr := kurtosis_types.SafeCastToString(starlarkOutputVal, "exec output")
if interpErr != nil {
return interpErr
}
pyg.futureReferenceIndex[starlarkOutputFutureRefStr] = fmt.Sprintf("{{ kurtosis.%v.output }}", taskUuid)

arguments := execInstruction.GetInstruction().GetArguments()
serviceNameArgumentValue, err := builtin_argument.ExtractArgumentValue[starlark.String](arguments, exec.ServiceNameArgName)
Expand All @@ -736,7 +796,7 @@ func (pyg *PlanYamlGeneratorImpl) updatePlanYamlFromExec(execInstruction *instru
task = &Task{ //nolint:exhaustruct
ServiceName: serviceNameArgumentValue.GoString(),
TaskType: EXEC,
Uuid: strconv.Itoa(pyg.generateUuid()),
Uuid: strconv.Itoa(taskUuid),
}

execRecipe, err := builtin_argument.ExtractArgumentValue[*recipe.ExecRecipe](arguments, exec.RecipeArgName)
Expand Down
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/kurtosis-tech/kurtosis/container-engine-lib/lib/database_accessors/enclave_db/file_artifacts_db"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/service_network"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/enclave_plan_persistence"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/interpretation_time_value_store"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/kurtosis_instruction/shared_helpers"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/runtime_value_store"
"github.com/kurtosis-tech/kurtosis/core/server/api_container/server/startosis_engine/startosis_constants"
Expand All @@ -23,11 +24,12 @@ import (

type PlanYamlGeneratorTestSuite struct {
suite.Suite
serviceNetwork *service_network.MockServiceNetwork
packageContentProvider *mock_package_content_provider.MockPackageContentProvider
runtimeValueStore *runtime_value_store.RuntimeValueStore
kurtosisBackend *backend_interface.KurtosisBackend
filesArtifactStore *enclave_data_directory.FilesArtifactStore
serviceNetwork *service_network.MockServiceNetwork
packageContentProvider *mock_package_content_provider.MockPackageContentProvider
runtimeValueStore *runtime_value_store.RuntimeValueStore
kurtosisBackend *backend_interface.KurtosisBackend
filesArtifactStore *enclave_data_directory.FilesArtifactStore
interpretationTimeValueStore *interpretation_time_value_store.InterpretationTimeValueStore

interpreter *StartosisInterpreter
validator *StartosisValidator
Expand All @@ -47,6 +49,10 @@ func (suite *PlanYamlGeneratorTestSuite) SetupTest() {
require.NoError(suite.T(), err)
suite.runtimeValueStore = runtimeValueStore

// mock interpretation time value store
interpretationTimeValueStore, err := interpretation_time_value_store.CreateInterpretationTimeValueStore(enclaveDb, dummySerde)
require.NoError(suite.T(), err)
suite.interpretationTimeValueStore = interpretationTimeValueStore
// mock service network
suite.serviceNetwork = service_network.NewMockServiceNetwork(suite.T())

Expand All @@ -66,7 +72,7 @@ func (suite *PlanYamlGeneratorTestSuite) SetupTest() {
"134123")
suite.serviceNetwork.EXPECT().GetApiContainerInfo().Return(apiContainerInfo)

suite.interpreter = NewStartosisInterpreter(suite.serviceNetwork, suite.packageContentProvider, suite.runtimeValueStore, nil, "")
suite.interpreter = NewStartosisInterpreter(suite.serviceNetwork, suite.packageContentProvider, suite.runtimeValueStore, nil, "", suite.interpretationTimeValueStore)

// mock kurtosis backend?

Expand Down Expand Up @@ -94,9 +100,9 @@ func (suite *PlanYamlGeneratorTestSuite) SetupTest() {
suite.runner = NewStartosisRunner(suite.interpreter, suite.validator, suite.executor)
}

//func TestRunPlanYamlGeneratorTestSuite(t *testing.T) {
// suite.Run(t, new(PlanYamlGeneratorTestSuite))
//}
func TestRunPlanYamlGeneratorTestSuite(t *testing.T) {
suite.Run(t, new(PlanYamlGeneratorTestSuite))
}

func (suite *PlanYamlGeneratorTestSuite) TearDownTest() {
suite.packageContentProvider.RemoveAll()
Expand Down Expand Up @@ -137,51 +143,17 @@ CMD ["node", "app.js"]
relativePathToMainFile := "main.star"

serializedScript := `def run(plan, args):
database = plan.add_service(
name="db",
config=ServiceConfig(
image="postgres:alpine",
env_vars = {
"POSTGRES_DB": "tedi",
"POSTGRES_USER": "tedi",
"POSTGRES_PASSWORD": "tedi",
}
)
)
dencunTime = (5 * 32 * 1) + 5
config = ServiceConfig(
image="ethpandaops/tx-fuzz:master",
entrypoint=["/bin/sh", "-c"],
cmd=[
" && ".join(
[
"apk update",
"apk add curl jq",
'current_epoch=$(curl -s http://{0}:{1}/eth/v2/beacon/blocks/head | jq -r ".version")'.format(
database.ip_address, 5
),
"echo $current_epoch",
'while [ $current_epoch != "deneb" ]; do echo "waiting for deneb, current epoch is $current_epoch"; current_epoch=$(curl -s http://{0}:{1}/eth/v2/beacon/blocks/head | jq -r ".version"); sleep {2}; done'.format(
database.ip_address,
5,
1,
),
'echo "sleep is over, starting to send blob transactions"',
"/tx-fuzz.bin blobs --rpc={} --sk={}".format(
database.ip_address,
["0x12"],
),
]
)
],
min_cpu=100,
max_cpu=1000,
min_memory=256,
max_memory=512,
node_selectors={"smth":"smth"},
)
plan.add_service("blob-spammer", config)
result = plan.run_sh(
run="echo some stuff",
)
database = plan.add_service(name="database", config=ServiceConfig(
image="postgres:latest",
env_vars={
"VAR_1": result.output,
"VAR_2": result.code
}
))
`
serializedJsonParams := "{}"
_, instructionsPlan, interpretationError := suite.interpreter.Interpret(context.Background(), packageId, mainFunctionName, noPackageReplaceOptions, relativePathToMainFile, serializedScript, serializedJsonParams, defaultNonBlockingMode, emptyEnclaveComponents, emptyInstructionsPlanMask)
Expand Down

0 comments on commit ccc256b

Please sign in to comment.